Hi All

I am trying to use KafkaIO as unbounded source, but the translation is
failing.  I am using FlinkRunner for the pipe.  It complains about
the org.apache.kafka.common.TopicPartition being not-serializable.

pipeline.apply(
        KafkaIO.read()
                .withTopics(ImmutableList.of("test-topic))
                .withBootstrapServers("localhost:9200")

Is it a known issue?  Here is the full exception details.

Caused by: org.apache.flink.api.common.InvalidProgramException: Object
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@2a855331
not serializable
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
~[flink-java-1.0.3.jar:1.0.3]
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
~[flink-java-1.0.3.jar:1.0.3]
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1075)
~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1057)
~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:281)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:244)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:106)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:87)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225)
~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)
~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:106)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:106)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)
~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
at
com.cisco.ndp.pipeline.common.runner.flink.PipelineMain.main(PipelineMain.java:70)
~[pipelinecommon-1.0.0.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_92]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_92]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_92]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
~[flink-clients_2.10-1.0.3.jar:1.0.3]
... 9 more
Caused by: java.io.NotSerializableException:
org.apache.kafka.common.TopicPartition
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
~[?:1.8.0_92]
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
~[?:1.8.0_92]
at java.util.ArrayList.writeObject(ArrayList.java:762) ~[?:1.8.0_92]
at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_92]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92]
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
~[?:1.8.0_92]
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
~[?:1.8.0_92]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
~[?:1.8.0_92]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[?:1.8.0_92]
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
~[?:1.8.0_92]
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
~[?:1.8.0_92]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
~[?:1.8.0_92]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[?:1.8.0_92]
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
~[?:1.8.0_92]
at java.util.ArrayList.writeObject(ArrayList.java:762) ~[?:1.8.0_92]
at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[?:?]



Regards
Sumit Chawla

Reply via email to