Explicit +Raghu
On Fri, Aug 19, 2016 at 4:24 PM, Chawla,Sumit <[email protected]>
wrote:
> Hi All
>
> I am trying to use KafkaIO as unbounded source, but the translation is
> failing. I am using FlinkRunner for the pipe. It complains about
> the org.apache.kafka.common.TopicPartition being not-serializable.
>
> pipeline.apply(
> KafkaIO.read()
> .withTopics(ImmutableList.of("test-topic))
> .withBootstrapServers("localhost:9200")
>
> Is it a known issue? Here is the full exception details.
>
> Caused by: org.apache.flink.api.common.InvalidProgramException: Object
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.
> UnboundedSourceWrapper@2a855331
> not serializable
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(
> ClosureCleaner.java:99)
> ~[flink-java-1.0.3.jar:1.0.3]
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
> ~[flink-java-1.0.3.jar:1.0.3]
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> clean(StreamExecutionEnvironment.java:1219)
> ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> addSource(StreamExecutionEnvironment.java:1131)
> ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> addSource(StreamExecutionEnvironment.java:1075)
> ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> addSource(StreamExecutionEnvironment.java:1057)
> ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> at
> org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla
> tors$UnboundedReadSourceTranslator.translateNode(
> FlinkStreamingTransformTranslators.java:281)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla
> tors$UnboundedReadSourceTranslator.translateNode(
> FlinkStreamingTransformTranslators.java:244)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat
> or.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:106)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat
> or.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:87)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:225)
> ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:220)
> ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:220)
> ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(
> TransformHierarchy.java:104)
> ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)
> ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.
> translate(FlinkPipelineTranslator.java:38)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(
> FlinkPipelineExecutionEnvironment.java:106)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(
> FlinkPipelineRunner.java:106)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(
> FlinkPipelineRunner.java:49)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> at
> com.cisco.ndp.pipeline.common.runner.flink.PipelineMain.
> main(PipelineMain.java:70)
> ~[pipelinecommon-1.0.0.jar:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_92]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
> ~[?:1.8.0_92]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_92]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:505)
> ~[flink-clients_2.10-1.0.3.jar:1.0.3]
> ... 9 more
> Caused by: java.io.NotSerializableException:
> org.apache.kafka.common.TopicPartition
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[?:1.8.0_92]
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[?:1.8.0_92]
> at java.util.ArrayList.writeObject(ArrayList.java:762) ~[?:1.8.0_92]
> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[?:?]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_92]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92]
> at java.io.ObjectStreamClass.invokeWriteObject(
> ObjectStreamClass.java:1028)
> ~[?:1.8.0_92]
> at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1496)
> ~[?:1.8.0_92]
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
> ~[?:1.8.0_92]
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> ~[?:1.8.0_92]
> at
> java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
> ~[?:1.8.0_92]
> at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
> ~[?:1.8.0_92]
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
> ~[?:1.8.0_92]
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> ~[?:1.8.0_92]
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[?:1.8.0_92]
> at java.util.ArrayList.writeObject(ArrayList.java:762) ~[?:1.8.0_92]
> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[?:?]
>
>
>
> Regards
> Sumit Chawla
>