Thanks Sumit. We should probably update our kafka dependency to make sure 0.9.0.0 is excluded.
On Sun, Aug 21, 2016 at 5:17 PM, Chawla,Sumit <[email protected]> wrote: > Thanks Dan.. today i was able to identify what the issue was. Kafka > TopicPartition is marked Serializable in kafka-clients-0.9.0.1.jar. > Somehow i was pulling down kafka-clients-0.9.0.0.jar. > > Regards > Sumit Chawla > > > On Sun, Aug 21, 2016 at 10:20 AM, Dan Halperin < > [email protected]> wrote: > >> Explicit +Raghu >> >> On Fri, Aug 19, 2016 at 4:24 PM, Chawla,Sumit <[email protected]> >> wrote: >> >> > Hi All >> > >> > I am trying to use KafkaIO as unbounded source, but the translation is >> > failing. I am using FlinkRunner for the pipe. It complains about >> > the org.apache.kafka.common.TopicPartition being not-serializable. >> > >> > pipeline.apply( >> > KafkaIO.read() >> > .withTopics(ImmutableList.of("test-topic)) >> > .withBootstrapServers("localhost:9200") >> > >> > Is it a known issue? Here is the full exception details. >> > >> > Caused by: org.apache.flink.api.common.InvalidProgramException: Object >> > org.apache.beam.runners.flink.translation.wrappers.streaming.io. >> > UnboundedSourceWrapper@2a855331 >> > not serializable >> > at >> > org.apache.flink.api.java.ClosureCleaner.ensureSerializable( >> > ClosureCleaner.java:99) >> > ~[flink-java-1.0.3.jar:1.0.3] >> > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleane >> r.java:61) >> > ~[flink-java-1.0.3.jar:1.0.3] >> > at >> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. >> > clean(StreamExecutionEnvironment.java:1219) >> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] >> > at >> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. >> > addSource(StreamExecutionEnvironment.java:1131) >> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] >> > at >> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. >> > addSource(StreamExecutionEnvironment.java:1075) >> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] >> > at >> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. >> > addSource(StreamExecutionEnvironment.java:1057) >> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] >> > at >> > org.apache.beam.runners.flink.translation.FlinkStreamingTran >> sformTransla >> > tors$UnboundedReadSourceTranslator.translateNode( >> >> > FlinkStreamingTransformTranslators.java:281) >> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] >> > at >> > org.apache.beam.runners.flink.translation.FlinkStreamingTran >> sformTransla >> > tors$UnboundedReadSourceTranslator.translateNode( >> > FlinkStreamingTransformTranslators.java:244) >> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] >> > at >> > org.apache.beam.runners.flink.translation.FlinkStreamingPipe >> lineTranslat >> > or.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:106) >> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] >> > at >> > org.apache.beam.runners.flink.translation.FlinkStreamingPipe >> lineTranslat >> > or.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:87) >> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] >> > at >> > org.apache.beam.sdk.runners.TransformTreeNode.visit( >> > TransformTreeNode.java:225) >> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] >> > at >> > org.apache.beam.sdk.runners.TransformTreeNode.visit( >> > TransformTreeNode.java:220) >> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] >> > at >> > org.apache.beam.sdk.runners.TransformTreeNode.visit( >> > TransformTreeNode.java:220) >> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] >> > at >> > org.apache.beam.sdk.runners.TransformHierarchy.visit( >> > TransformHierarchy.java:104) >> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] >> > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline. >> java:292) >> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] >> > at >> > org.apache.beam.runners.flink.translation.FlinkPipelineTranslator. >> > translate(FlinkPipelineTranslator.java:38) >> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] >> > at >> > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironm >> ent.translate( >> > FlinkPipelineExecutionEnvironment.java:106) >> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] >> > at >> > org.apache.beam.runners.flink.FlinkPipelineRunner.run( >> > FlinkPipelineRunner.java:106) >> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] >> > at >> > org.apache.beam.runners.flink.FlinkPipelineRunner.run( >> >> > FlinkPipelineRunner.java:49) >> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] >> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) >> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] >> > at >> > com.cisco.ndp.pipeline.common.runner.flink.PipelineMain. >> > main(PipelineMain.java:70) >> > ~[pipelinecommon-1.0.0.jar:?] >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> > ~[?:1.8.0_92] >> > at >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java: >> > 62) >> > ~[?:1.8.0_92] >> > at >> > sun.reflect.DelegatingMethodAccessorImpl.invoke( >> > DelegatingMethodAccessorImpl.java:43) >> > ~[?:1.8.0_92] >> > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92] >> > at >> > org.apache.flink.client.program.PackagedProgram.callMainMethod( >> > PackagedProgram.java:505) >> > ~[flink-clients_2.10-1.0.3.jar:1.0.3] >> > ... 9 more >> > Caused by: java.io.NotSerializableException: >> > org.apache.kafka.common.TopicPartition >> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream. >> java:1184) >> > ~[?:1.8.0_92] >> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> > ~[?:1.8.0_92] >> > at java.util.ArrayList.writeObject(ArrayList.java:762) ~[?:1.8.0_92] >> > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[?:?] >> > at >> > sun.reflect.DelegatingMethodAccessorImpl.invoke( >> > DelegatingMethodAccessorImpl.java:43) >> > ~[?:1.8.0_92] >> > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92] >> > at java.io.ObjectStreamClass.invokeWriteObject( >> >> > ObjectStreamClass.java:1028) >> > ~[?:1.8.0_92] >> > at java.io.ObjectOutputStream.writeSerialData( >> > ObjectOutputStream.java:1496) >> > ~[?:1.8.0_92] >> > at >> > java.io.ObjectOutputStream.writeOrdinaryObject( >> > ObjectOutputStream.java:1432) >> > ~[?:1.8.0_92] >> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream. >> java:1178) >> > ~[?:1.8.0_92] >> > at >> > java.io.ObjectOutputStream.defaultWriteFields( >> > ObjectOutputStream.java:1548) >> > ~[?:1.8.0_92] >> > at java.io.ObjectOutputStream.writeSerialData( >> > ObjectOutputStream.java:1509) >> > ~[?:1.8.0_92] >> > at >> > java.io.ObjectOutputStream.writeOrdinaryObject( >> > ObjectOutputStream.java:1432) >> > ~[?:1.8.0_92] >> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream. >> java:1178) >> > ~[?:1.8.0_92] >> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> > ~[?:1.8.0_92] >> > at java.util.ArrayList.writeObject(ArrayList.java:762) ~[?:1.8.0_92] >> > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[?:?] >> > >> > >> > >> > Regards >> > Sumit Chawla >> > >> > >
