[
https://issues.apache.org/jira/browse/BEAM-7793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mike Pedersen updated BEAM-7793:
--------------------------------
Description:
If a BagState contains Rows, it appears to be empty when read in a timer. This
is for example used by GroupIntoBatches, causing the following to fail, as the
GroupIntoBatches transform will not output anything:
{code:java}
public class BagStateTests implements Serializable {
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@Test
public void canBatchRows() {
Schema schema = Schema.builder().addInt32Field("a").build();
PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline
.apply(Create
.of(Row.withSchema(schema).addValue(1).build())
.withType(TypeDescriptors.rows()))
.setRowSchema(schema)
.apply(WithKeys.<Integer,
Row>of(0).withKeyType(TypeDescriptors.integers()))
.apply(GroupIntoBatches.ofSize(10));
PAssert.that(pcoll).containsInAnyOrder(
KV.of(0, Collections.singletonList(
Row.withSchema(schema).addValue(1).build()
)));
pipeline.run().waitUntilFinish();
}
}
{code}
The above uses GroupIntoBatches as an example, but the problem seem to be with
all DoFn having BagState.
Also, the above test might not be the best since it assumes the iterable
outputted by GroupIntoBatches is a list. Regardless, the test should not fail
due to no values in the output collection.
was:
If a BagState contains Rows, it appears to be empty when read in a timer. This
is for example used by GroupIntoBatches, causing the following to fail:
{code:java}
public class BagStateTests implements Serializable {
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@Test
public void canBatchRows() {
Schema schema = Schema.builder().addInt32Field("a").build();
PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline
.apply(Create
.of(Row.withSchema(schema).addValue(1).build())
.withType(TypeDescriptors.rows()))
.setRowSchema(schema)
.apply(WithKeys.<Integer,
Row>of(0).withKeyType(TypeDescriptors.integers()))
.apply(GroupIntoBatches.ofSize(10));
PAssert.that(pcoll).containsInAnyOrder(
KV.of(0, Collections.singletonList(
Row.withSchema(schema).addValue(1).build()
)));
pipeline.run().waitUntilFinish();
}
}
{code}
The above uses GroupIntoBatches as an example, but the problem seem to be with
all DoFn having BagState.
> BagState drops Rows when triggered by timer
> -------------------------------------------
>
> Key: BEAM-7793
> URL: https://issues.apache.org/jira/browse/BEAM-7793
> Project: Beam
> Issue Type: Bug
> Components: runner-direct, sdk-java-core
> Affects Versions: 2.13.0
> Reporter: Mike Pedersen
> Priority: Major
>
> If a BagState contains Rows, it appears to be empty when read in a timer.
> This is for example used by GroupIntoBatches, causing the following to fail,
> as the GroupIntoBatches transform will not output anything:
> {code:java}
> public class BagStateTests implements Serializable {
> @Rule
> public final transient TestPipeline pipeline = TestPipeline.create();
> @Test
> public void canBatchRows() {
> Schema schema = Schema.builder().addInt32Field("a").build();
> PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline
> .apply(Create
> .of(Row.withSchema(schema).addValue(1).build())
> .withType(TypeDescriptors.rows()))
> .setRowSchema(schema)
> .apply(WithKeys.<Integer,
> Row>of(0).withKeyType(TypeDescriptors.integers()))
> .apply(GroupIntoBatches.ofSize(10));
> PAssert.that(pcoll).containsInAnyOrder(
> KV.of(0, Collections.singletonList(
> Row.withSchema(schema).addValue(1).build()
> )));
> pipeline.run().waitUntilFinish();
> }
> }
> {code}
> The above uses GroupIntoBatches as an example, but the problem seem to be
> with all DoFn having BagState.
> Also, the above test might not be the best since it assumes the iterable
> outputted by GroupIntoBatches is a list. Regardless, the test should not fail
> due to no values in the output collection.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)