Hi,

I hit another issue with the portable Flink runner. Long story short - reading from Kafka is not working in portable Flink. After solving issues with expansion service configuration (ability to add use_deprecated_read) option, because flink portable runner has issues with SDF [1], [2]. After being able to inject the use_deprecated_read into expansion service I was able to get an execution DAG that has the UnboundedSource, but then more and more issues appeared (probably related to missing LengthPrefixCoder somewhere - maybe at the output from the primitive Read). I wanted to create a test for it and I found out, that there actually is ReadSourcePortableTest in FlinkRunner, but _it tests nothing_. The problem is that Read is transformed to SDF, so this test tests the SDF, not the Read transform. As a result, the Read transform does not work.

I tried using convertReadBasedSplittableDoFnsToPrimitiveReads so that I could make the test fail and debug that, but I got into

java.lang.IllegalArgumentException: PCollectionNodes 
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
 PCollection=unique_name: 
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
coder_id: "IterableCoder"
is_bounded: BOUNDED
windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
}] were consumed but never produced


which gave me the last knock-out. :)

My current impression is that starting from Beam 2.25.0, portable FlinkRunner is not able to read from Kafka. Could someone give me a hint about what is wrong with using convertReadBasedSplittableDoFnsToPrimitiveReads in the test [3]?

 Jan

[1] https://issues.apache.org/jira/browse/BEAM-11991

[2] https://issues.apache.org/jira/browse/BEAM-11998

[3] https://github.com/apache/beam/pull/15181

Reply via email to