Thanks Dan.. today i was able to identify what the issue was.  Kafka
TopicPartition is marked Serializable in kafka-clients-0.9.0.1.jar.
Somehow i was pulling down kafka-clients-0.9.0.0.jar.

Regards
Sumit Chawla


On Sun, Aug 21, 2016 at 10:20 AM, Dan Halperin <[email protected]>
wrote:

> Explicit +Raghu
>
> On Fri, Aug 19, 2016 at 4:24 PM, Chawla,Sumit <[email protected]>
> wrote:
>
> > Hi All
> >
> > I am trying to use KafkaIO as unbounded source, but the translation is
> > failing.  I am using FlinkRunner for the pipe.  It complains about
> > the org.apache.kafka.common.TopicPartition being not-serializable.
> >
> > pipeline.apply(
> >         KafkaIO.read()
> >                 .withTopics(ImmutableList.of("test-topic))
> >                 .withBootstrapServers("localhost:9200")
> >
> > Is it a known issue?  Here is the full exception details.
> >
> > Caused by: org.apache.flink.api.common.InvalidProgramException: Object
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.
> > UnboundedSourceWrapper@2a855331
> > not serializable
> > at
> > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(
> > ClosureCleaner.java:99)
> > ~[flink-java-1.0.3.jar:1.0.3]
> > at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:61)
> > ~[flink-java-1.0.3.jar:1.0.3]
> > at
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> > clean(StreamExecutionEnvironment.java:1219)
> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> > at
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> > addSource(StreamExecutionEnvironment.java:1131)
> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> > at
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> > addSource(StreamExecutionEnvironment.java:1075)
> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> > at
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> > addSource(StreamExecutionEnvironment.java:1057)
> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> > at
> > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla
> > tors$UnboundedReadSourceTranslator.translateNode(
> > FlinkStreamingTransformTranslators.java:281)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla
> > tors$UnboundedReadSourceTranslator.translateNode(
> > FlinkStreamingTransformTranslators.java:244)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat
> > or.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:106)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat
> > or.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:87)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > TransformTreeNode.java:225)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > TransformTreeNode.java:220)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > TransformTreeNode.java:220)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.sdk.runners.TransformHierarchy.visit(
> > TransformHierarchy.java:104)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.
> > translate(FlinkPipelineTranslator.java:38)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironm
> ent.translate(
> > FlinkPipelineExecutionEnvironment.java:106)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.FlinkPipelineRunner.run(
> > FlinkPipelineRunner.java:106)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.FlinkPipelineRunner.run(
> > FlinkPipelineRunner.java:49)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > com.cisco.ndp.pipeline.common.runner.flink.PipelineMain.
> > main(PipelineMain.java:70)
> > ~[pipelinecommon-1.0.0.jar:?]
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > ~[?:1.8.0_92]
> > at
> > sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:
> > 62)
> > ~[?:1.8.0_92]
> > at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > DelegatingMethodAccessorImpl.java:43)
> > ~[?:1.8.0_92]
> > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92]
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(
> > PackagedProgram.java:505)
> > ~[flink-clients_2.10-1.0.3.jar:1.0.3]
> > ... 9 more
> > Caused by: java.io.NotSerializableException:
> > org.apache.kafka.common.TopicPartition
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> > ~[?:1.8.0_92]
> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> > ~[?:1.8.0_92]
> > at java.util.ArrayList.writeObject(ArrayList.java:762) ~[?:1.8.0_92]
> > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[?:?]
> > at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > DelegatingMethodAccessorImpl.java:43)
> > ~[?:1.8.0_92]
> > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92]
> > at java.io.ObjectStreamClass.invokeWriteObject(
> > ObjectStreamClass.java:1028)
> > ~[?:1.8.0_92]
> > at java.io.ObjectOutputStream.writeSerialData(
> > ObjectOutputStream.java:1496)
> > ~[?:1.8.0_92]
> > at
> > java.io.ObjectOutputStream.writeOrdinaryObject(
> > ObjectOutputStream.java:1432)
> > ~[?:1.8.0_92]
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> > ~[?:1.8.0_92]
> > at
> > java.io.ObjectOutputStream.defaultWriteFields(
> > ObjectOutputStream.java:1548)
> > ~[?:1.8.0_92]
> > at java.io.ObjectOutputStream.writeSerialData(
> > ObjectOutputStream.java:1509)
> > ~[?:1.8.0_92]
> > at
> > java.io.ObjectOutputStream.writeOrdinaryObject(
> > ObjectOutputStream.java:1432)
> > ~[?:1.8.0_92]
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> > ~[?:1.8.0_92]
> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> > ~[?:1.8.0_92]
> > at java.util.ArrayList.writeObject(ArrayList.java:762) ~[?:1.8.0_92]
> > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[?:?]
> >
> >
> >
> > Regards
> > Sumit Chawla
> >
>

Reply via email to