[ 
https://issues.apache.org/jira/browse/BEAM-7793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mike Pedersen updated BEAM-7793:
--------------------------------
    Description: 
If a BagState contains rows, it appears to be empty when read in a timer. This 
is for example used by GroupIntoBatches, causing the following to fail:

{code:java}

public class BagStateTests implements Serializable {
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    @Test
    public void canBatchRows() {
        Schema schema = Schema.builder().addInt32Field("a").build();

        PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline
                .apply(Create
                        .of(Row.withSchema(schema).addValue(1).build())
                        .withType(TypeDescriptors.rows()))
                .setRowSchema(schema)
                .apply(WithKeys.<Integer, 
Row>of(0).withKeyType(TypeDescriptors.integers()))
                .apply(GroupIntoBatches.ofSize(10));

        PAssert.that(pcoll).containsInAnyOrder(
                KV.of(0, Collections.singletonList(
                        Row.withSchema(schema).addValue(1).build()
                )));

        pipeline.run().waitUntilFinish();
    }
}
{code}

The above uses GroupIntoBatches as an example, but the problem seem to be with 
all DoFn having BagState.

  was:
If a BagState contains rows, it appears to be empty when read in a timer. This 
is for example used by GroupIntoBatches, causing the following to fail:

{code:java}
    @Test
    public void canBatchRows() {
        Schema schema = Schema.builder().addInt32Field("a").build();

        PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline
                .apply(Create
                        .of(Row.withSchema(schema).addValue(1).build())
                        .withType(TypeDescriptors.rows()))
                .setRowSchema(schema)
                .apply(WithKeys.<Integer, 
Row>of(0).withKeyType(TypeDescriptors.integers()))
                .apply(GroupIntoBatches.ofSize(10));

        PAssert.that(pcoll).containsInAnyOrder(
                KV.of(0, Collections.singletonList(
                        Row.withSchema(schema).addValue(1).build()
                )));

        pipeline.run().waitUntilFinish();
    }
{code}

The above uses GroupIntoBatches as an example, but the problem seem to be with 
all DoFn having BagState.


> BagState drops Rows when triggered by timer
> -------------------------------------------
>
>                 Key: BEAM-7793
>                 URL: https://issues.apache.org/jira/browse/BEAM-7793
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct, sdk-java-core
>    Affects Versions: 2.13.0
>            Reporter: Mike Pedersen
>            Priority: Major
>
> If a BagState contains rows, it appears to be empty when read in a timer. 
> This is for example used by GroupIntoBatches, causing the following to fail:
> {code:java}
> public class BagStateTests implements Serializable {
>     @Rule
>     public final transient TestPipeline pipeline = TestPipeline.create();
>     @Test
>     public void canBatchRows() {
>         Schema schema = Schema.builder().addInt32Field("a").build();
>         PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline
>                 .apply(Create
>                         .of(Row.withSchema(schema).addValue(1).build())
>                         .withType(TypeDescriptors.rows()))
>                 .setRowSchema(schema)
>                 .apply(WithKeys.<Integer, 
> Row>of(0).withKeyType(TypeDescriptors.integers()))
>                 .apply(GroupIntoBatches.ofSize(10));
>         PAssert.that(pcoll).containsInAnyOrder(
>                 KV.of(0, Collections.singletonList(
>                         Row.withSchema(schema).addValue(1).build()
>                 )));
>         pipeline.run().waitUntilFinish();
>     }
> }
> {code}
> The above uses GroupIntoBatches as an example, but the problem seem to be 
> with all DoFn having BagState.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to