[ 
https://issues.apache.org/jira/browse/BEAM-7793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mike Pedersen updated BEAM-7793:
--------------------------------
    Description: 
If a BagState contains Rows, it appears to be empty when read in a timer. This 
is for example used by GroupIntoBatches, causing the following to fail, as the 
GroupIntoBatches transform will not output anything:

{code:java}
public class BagStateTests implements Serializable {
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    @Test
    public void canBatchRows() {
        Schema schema = Schema.builder().addInt32Field("a").build();

        PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline
                .apply(Create
                        .of(Row.withSchema(schema).addValue(1).build())
                        .withType(TypeDescriptors.rows()))
                .setRowSchema(schema)
                .apply(WithKeys.<Integer, 
Row>of(0).withKeyType(TypeDescriptors.integers()))
                .apply(GroupIntoBatches.ofSize(10));

        PAssert.that(pcoll).containsInAnyOrder(
                KV.of(0, Collections.singletonList(
                        Row.withSchema(schema).addValue(1).build()
                )));

        pipeline.run().waitUntilFinish();
    }
}
{code}

The above uses GroupIntoBatches as an example, but the problem seem to be with 
all DoFn having BagState.

Also, the above test might not be the best since it assumes the iterable 
outputted by GroupIntoBatches is a list. Regardless, the test should not fail 
due to no values in the output collection.

  was:
If a BagState contains Rows, it appears to be empty when read in a timer. This 
is for example used by GroupIntoBatches, causing the following to fail:

{code:java}
public class BagStateTests implements Serializable {
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    @Test
    public void canBatchRows() {
        Schema schema = Schema.builder().addInt32Field("a").build();

        PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline
                .apply(Create
                        .of(Row.withSchema(schema).addValue(1).build())
                        .withType(TypeDescriptors.rows()))
                .setRowSchema(schema)
                .apply(WithKeys.<Integer, 
Row>of(0).withKeyType(TypeDescriptors.integers()))
                .apply(GroupIntoBatches.ofSize(10));

        PAssert.that(pcoll).containsInAnyOrder(
                KV.of(0, Collections.singletonList(
                        Row.withSchema(schema).addValue(1).build()
                )));

        pipeline.run().waitUntilFinish();
    }
}
{code}

The above uses GroupIntoBatches as an example, but the problem seem to be with 
all DoFn having BagState.


> BagState drops Rows when triggered by timer
> -------------------------------------------
>
>                 Key: BEAM-7793
>                 URL: https://issues.apache.org/jira/browse/BEAM-7793
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct, sdk-java-core
>    Affects Versions: 2.13.0
>            Reporter: Mike Pedersen
>            Priority: Major
>
> If a BagState contains Rows, it appears to be empty when read in a timer. 
> This is for example used by GroupIntoBatches, causing the following to fail, 
> as the GroupIntoBatches transform will not output anything:
> {code:java}
> public class BagStateTests implements Serializable {
>     @Rule
>     public final transient TestPipeline pipeline = TestPipeline.create();
>     @Test
>     public void canBatchRows() {
>         Schema schema = Schema.builder().addInt32Field("a").build();
>         PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline
>                 .apply(Create
>                         .of(Row.withSchema(schema).addValue(1).build())
>                         .withType(TypeDescriptors.rows()))
>                 .setRowSchema(schema)
>                 .apply(WithKeys.<Integer, 
> Row>of(0).withKeyType(TypeDescriptors.integers()))
>                 .apply(GroupIntoBatches.ofSize(10));
>         PAssert.that(pcoll).containsInAnyOrder(
>                 KV.of(0, Collections.singletonList(
>                         Row.withSchema(schema).addValue(1).build()
>                 )));
>         pipeline.run().waitUntilFinish();
>     }
> }
> {code}
> The above uses GroupIntoBatches as an example, but the problem seem to be 
> with all DoFn having BagState.
> Also, the above test might not be the best since it assumes the iterable 
> outputted by GroupIntoBatches is a list. Regardless, the test should not fail 
> due to no values in the output collection.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to