[ 
https://issues.apache.org/jira/browse/BEAM-7793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16890739#comment-16890739
 ] 

Mike Pedersen commented on BEAM-7793:
-------------------------------------

Seems to also be a problem on Flink, although it fails with an exception 
instead of not outputting anything:

{noformat}
org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of 
different types. 
Input1=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(KvCoder(VarIntCoder,IterableCoder(ValueInSingleWindow$Coder(KvCoder(VarIntCoder,IterableCoder(SchemaCoder:
 Schema: Fields:
Field{name=a, description=, type=FieldType{typeName=INT32, nullable=false, 
logicalType=null, collectionElementType=null, mapKeyType=null, 
mapValueType=null, rowSchema=null, metadata={}}}
  UUID: 660118c8-32c4-4b28-8bad-20eaf20b0478 delegateCoder: 
org.apache.beam.sdk.coders.Coder$ByteBuddy$d2mfLCeJ@4ce94d2f)),GlobalWindow$Coder))),GlobalWindow$Coder)},
 
input2=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(KvCoder(VarIntCoder,IterableCoder(ValueInSingleWindow$Coder(KvCoder(VarIntCoder,IterableCoder(SchemaCoder:
 Schema: Fields:
Field{name=a, description=, type=FieldType{typeName=INT32, nullable=false, 
logicalType=null, collectionElementType=null, mapKeyType=null, 
mapValueType=null, rowSchema=null, metadata={}}}
  UUID: 660118c8-32c4-4b28-8bad-20eaf20b0478 delegateCoder: 
org.apache.beam.sdk.coders.Coder$ByteBuddy$d2mfLCeJ@4ce94d2f)),GlobalWindow$Coder))),GlobalWindow$Coder)}

        at 
org.apache.flink.api.java.operators.UnionOperator.<init>(UnionOperator.java:48)
        at org.apache.flink.api.java.DataSet.union(DataSet.java:1242)
        at 
org.apache.beam.runners.flink.FlinkBatchTransformTranslators$FlattenPCollectionTranslatorBatch.translateNode(FlinkBatchTransformTranslators.java:689)
        at 
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator.applyBatchTransform(FlinkBatchPipelineTranslator.java:113)
        at 
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator.visitPrimitiveTransform(FlinkBatchPipelineTranslator.java:97)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
        at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
        at 
org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
        at 
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator.translate(FlinkBatchPipelineTranslator.java:49)
        at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:116)
        at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
        at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
        at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
{noformat}


> BagState drops Rows when triggered by timer
> -------------------------------------------
>
>                 Key: BEAM-7793
>                 URL: https://issues.apache.org/jira/browse/BEAM-7793
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct, sdk-java-core
>    Affects Versions: 2.13.0
>            Reporter: Mike Pedersen
>            Priority: Major
>
> If a BagState contains Rows, it appears to be empty when read in a timer. 
> This is for example used by GroupIntoBatches, causing the following to fail, 
> as the GroupIntoBatches transform will not output anything:
> {code:java}
> public class BagStateTests implements Serializable {
>     @Rule
>     public final transient TestPipeline pipeline = TestPipeline.create();
>     @Test
>     public void canBatchRows() {
>         Schema schema = Schema.builder().addInt32Field("a").build();
>         PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline
>                 .apply(Create
>                         .of(Row.withSchema(schema).addValue(1).build())
>                         .withType(TypeDescriptors.rows()))
>                 .setRowSchema(schema)
>                 .apply(WithKeys.<Integer, 
> Row>of(0).withKeyType(TypeDescriptors.integers()))
>                 .apply(GroupIntoBatches.ofSize(10));
>         PAssert.that(pcoll).containsInAnyOrder(
>                 KV.of(0, Collections.singletonList(
>                         Row.withSchema(schema).addValue(1).build()
>                 )));
>         pipeline.run().waitUntilFinish();
>     }
> }
> {code}
> The above uses GroupIntoBatches as an example, but the problem seem to be 
> with all DoFn having BagState.
> Also, the above test might not be the best since it assumes the iterable 
> outputted by GroupIntoBatches is a list. Regardless, the test should not fail 
> due to no values in the output collection.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to