Thanks Dan.. today i was able to identify what the issue was. Kafka TopicPartition is marked Serializable in kafka-clients-0.9.0.1.jar. Somehow i was pulling down kafka-clients-0.9.0.0.jar.
Regards Sumit Chawla On Sun, Aug 21, 2016 at 10:20 AM, Dan Halperin <[email protected]> wrote: > Explicit +Raghu > > On Fri, Aug 19, 2016 at 4:24 PM, Chawla,Sumit <[email protected]> > wrote: > > > Hi All > > > > I am trying to use KafkaIO as unbounded source, but the translation is > > failing. I am using FlinkRunner for the pipe. It complains about > > the org.apache.kafka.common.TopicPartition being not-serializable. > > > > pipeline.apply( > > KafkaIO.read() > > .withTopics(ImmutableList.of("test-topic)) > > .withBootstrapServers("localhost:9200") > > > > Is it a known issue? Here is the full exception details. > > > > Caused by: org.apache.flink.api.common.InvalidProgramException: Object > > org.apache.beam.runners.flink.translation.wrappers.streaming.io. > > UnboundedSourceWrapper@2a855331 > > not serializable > > at > > org.apache.flink.api.java.ClosureCleaner.ensureSerializable( > > ClosureCleaner.java:99) > > ~[flink-java-1.0.3.jar:1.0.3] > > at org.apache.flink.api.java.ClosureCleaner.clean( > ClosureCleaner.java:61) > > ~[flink-java-1.0.3.jar:1.0.3] > > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. > > clean(StreamExecutionEnvironment.java:1219) > > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] > > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. > > addSource(StreamExecutionEnvironment.java:1131) > > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] > > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. > > addSource(StreamExecutionEnvironment.java:1075) > > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] > > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. > > addSource(StreamExecutionEnvironment.java:1057) > > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] > > at > > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla > > tors$UnboundedReadSourceTranslator.translateNode( > > FlinkStreamingTransformTranslators.java:281) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla > > tors$UnboundedReadSourceTranslator.translateNode( > > FlinkStreamingTransformTranslators.java:244) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat > > or.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:106) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat > > or.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:87) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.sdk.runners.TransformTreeNode.visit( > > TransformTreeNode.java:225) > > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.sdk.runners.TransformTreeNode.visit( > > TransformTreeNode.java:220) > > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.sdk.runners.TransformTreeNode.visit( > > TransformTreeNode.java:220) > > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.sdk.runners.TransformHierarchy.visit( > > TransformHierarchy.java:104) > > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292) > > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.runners.flink.translation.FlinkPipelineTranslator. > > translate(FlinkPipelineTranslator.java:38) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironm > ent.translate( > > FlinkPipelineExecutionEnvironment.java:106) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.runners.flink.FlinkPipelineRunner.run( > > FlinkPipelineRunner.java:106) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.runners.flink.FlinkPipelineRunner.run( > > FlinkPipelineRunner.java:49) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) > > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > > at > > com.cisco.ndp.pipeline.common.runner.flink.PipelineMain. > > main(PipelineMain.java:70) > > ~[pipelinecommon-1.0.0.jar:?] > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > ~[?:1.8.0_92] > > at > > sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java: > > 62) > > ~[?:1.8.0_92] > > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke( > > DelegatingMethodAccessorImpl.java:43) > > ~[?:1.8.0_92] > > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92] > > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod( > > PackagedProgram.java:505) > > ~[flink-clients_2.10-1.0.3.jar:1.0.3] > > ... 9 more > > Caused by: java.io.NotSerializableException: > > org.apache.kafka.common.TopicPartition > > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > > ~[?:1.8.0_92] > > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > > ~[?:1.8.0_92] > > at java.util.ArrayList.writeObject(ArrayList.java:762) ~[?:1.8.0_92] > > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[?:?] > > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke( > > DelegatingMethodAccessorImpl.java:43) > > ~[?:1.8.0_92] > > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92] > > at java.io.ObjectStreamClass.invokeWriteObject( > > ObjectStreamClass.java:1028) > > ~[?:1.8.0_92] > > at java.io.ObjectOutputStream.writeSerialData( > > ObjectOutputStream.java:1496) > > ~[?:1.8.0_92] > > at > > java.io.ObjectOutputStream.writeOrdinaryObject( > > ObjectOutputStream.java:1432) > > ~[?:1.8.0_92] > > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > > ~[?:1.8.0_92] > > at > > java.io.ObjectOutputStream.defaultWriteFields( > > ObjectOutputStream.java:1548) > > ~[?:1.8.0_92] > > at java.io.ObjectOutputStream.writeSerialData( > > ObjectOutputStream.java:1509) > > ~[?:1.8.0_92] > > at > > java.io.ObjectOutputStream.writeOrdinaryObject( > > ObjectOutputStream.java:1432) > > ~[?:1.8.0_92] > > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > > ~[?:1.8.0_92] > > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > > ~[?:1.8.0_92] > > at java.util.ArrayList.writeObject(ArrayList.java:762) ~[?:1.8.0_92] > > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[?:?] > > > > > > > > Regards > > Sumit Chawla > > >
