[
https://issues.apache.org/jira/browse/BEAM-7793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mike Pedersen updated BEAM-7793:
--------------------------------
Description:
If a BagState contains rows, it appears to be empty when read in a timer. This
is for example used by GroupIntoBatches, causing the following to fail:
{code:java}
public class BagStateTests implements Serializable {
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@Test
public void canBatchRows() {
Schema schema = Schema.builder().addInt32Field("a").build();
PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline
.apply(Create
.of(Row.withSchema(schema).addValue(1).build())
.withType(TypeDescriptors.rows()))
.setRowSchema(schema)
.apply(WithKeys.<Integer,
Row>of(0).withKeyType(TypeDescriptors.integers()))
.apply(GroupIntoBatches.ofSize(10));
PAssert.that(pcoll).containsInAnyOrder(
KV.of(0, Collections.singletonList(
Row.withSchema(schema).addValue(1).build()
)));
pipeline.run().waitUntilFinish();
}
}
{code}
The above uses GroupIntoBatches as an example, but the problem seem to be with
all DoFn having BagState.
was:
If a BagState contains rows, it appears to be empty when read in a timer. This
is for example used by GroupIntoBatches, causing the following to fail:
{code:java}
public class BagStateTests implements Serializable {
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@Test
public void canBatchRows() {
Schema schema = Schema.builder().addInt32Field("a").build();
PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline
.apply(Create
.of(Row.withSchema(schema).addValue(1).build())
.withType(TypeDescriptors.rows()))
.setRowSchema(schema)
.apply(WithKeys.<Integer,
Row>of(0).withKeyType(TypeDescriptors.integers()))
.apply(GroupIntoBatches.ofSize(10));
PAssert.that(pcoll).containsInAnyOrder(
KV.of(0, Collections.singletonList(
Row.withSchema(schema).addValue(1).build()
)));
pipeline.run().waitUntilFinish();
}
}
{code}
The above uses GroupIntoBatches as an example, but the problem seem to be with
all DoFn having BagState.
> BagState drops Rows when triggered by timer
> -------------------------------------------
>
> Key: BEAM-7793
> URL: https://issues.apache.org/jira/browse/BEAM-7793
> Project: Beam
> Issue Type: Bug
> Components: runner-direct, sdk-java-core
> Affects Versions: 2.13.0
> Reporter: Mike Pedersen
> Priority: Major
>
> If a BagState contains rows, it appears to be empty when read in a timer.
> This is for example used by GroupIntoBatches, causing the following to fail:
> {code:java}
> public class BagStateTests implements Serializable {
> @Rule
> public final transient TestPipeline pipeline = TestPipeline.create();
> @Test
> public void canBatchRows() {
> Schema schema = Schema.builder().addInt32Field("a").build();
> PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline
> .apply(Create
> .of(Row.withSchema(schema).addValue(1).build())
> .withType(TypeDescriptors.rows()))
> .setRowSchema(schema)
> .apply(WithKeys.<Integer,
> Row>of(0).withKeyType(TypeDescriptors.integers()))
> .apply(GroupIntoBatches.ofSize(10));
> PAssert.that(pcoll).containsInAnyOrder(
> KV.of(0, Collections.singletonList(
> Row.withSchema(schema).addValue(1).build()
> )));
> pipeline.run().waitUntilFinish();
> }
> }
> {code}
> The above uses GroupIntoBatches as an example, but the problem seem to be
> with all DoFn having BagState.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)