Thanks Sumit.

We should probably update our kafka dependency to make sure 0.9.0.0 is
excluded.

On Sun, Aug 21, 2016 at 5:17 PM, Chawla,Sumit <[email protected]>
wrote:

> Thanks Dan.. today i was able to identify what the issue was.  Kafka
> TopicPartition is marked Serializable in kafka-clients-0.9.0.1.jar.
> Somehow i was pulling down kafka-clients-0.9.0.0.jar.
>
> Regards
> Sumit Chawla
>
>
> On Sun, Aug 21, 2016 at 10:20 AM, Dan Halperin <
> [email protected]> wrote:
>
>> Explicit +Raghu
>>
>> On Fri, Aug 19, 2016 at 4:24 PM, Chawla,Sumit <[email protected]>
>> wrote:
>>
>> > Hi All
>> >
>> > I am trying to use KafkaIO as unbounded source, but the translation is
>> > failing.  I am using FlinkRunner for the pipe.  It complains about
>> > the org.apache.kafka.common.TopicPartition being not-serializable.
>> >
>> > pipeline.apply(
>> >         KafkaIO.read()
>> >                 .withTopics(ImmutableList.of("test-topic))
>> >                 .withBootstrapServers("localhost:9200")
>> >
>> > Is it a known issue?  Here is the full exception details.
>> >
>> > Caused by: org.apache.flink.api.common.InvalidProgramException: Object
>> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.
>> > UnboundedSourceWrapper@2a855331
>> > not serializable
>> > at
>> > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(
>> > ClosureCleaner.java:99)
>> > ~[flink-java-1.0.3.jar:1.0.3]
>> > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleane
>> r.java:61)
>> > ~[flink-java-1.0.3.jar:1.0.3]
>> > at
>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
>> > clean(StreamExecutionEnvironment.java:1219)
>> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
>> > at
>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
>> > addSource(StreamExecutionEnvironment.java:1131)
>> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
>> > at
>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
>> > addSource(StreamExecutionEnvironment.java:1075)
>> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
>> > at
>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
>> > addSource(StreamExecutionEnvironment.java:1057)
>> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
>> > at
>> > org.apache.beam.runners.flink.translation.FlinkStreamingTran
>> sformTransla
>> > tors$UnboundedReadSourceTranslator.translateNode(
>>
>> > FlinkStreamingTransformTranslators.java:281)
>> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
>> > at
>> > org.apache.beam.runners.flink.translation.FlinkStreamingTran
>> sformTransla
>> > tors$UnboundedReadSourceTranslator.translateNode(
>> > FlinkStreamingTransformTranslators.java:244)
>> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
>> > at
>> > org.apache.beam.runners.flink.translation.FlinkStreamingPipe
>> lineTranslat
>> > or.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:106)
>> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
>> > at
>> > org.apache.beam.runners.flink.translation.FlinkStreamingPipe
>> lineTranslat
>> > or.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:87)
>> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
>> > at
>> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
>> > TransformTreeNode.java:225)
>> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
>> > at
>> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
>> > TransformTreeNode.java:220)
>> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
>> > at
>> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
>> > TransformTreeNode.java:220)
>> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
>> > at
>> > org.apache.beam.sdk.runners.TransformHierarchy.visit(
>> > TransformHierarchy.java:104)
>> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
>> > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.
>> java:292)
>> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
>> > at
>> > org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.
>> > translate(FlinkPipelineTranslator.java:38)
>> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
>> > at
>> > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironm
>> ent.translate(
>> > FlinkPipelineExecutionEnvironment.java:106)
>> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
>> > at
>> > org.apache.beam.runners.flink.FlinkPipelineRunner.run(
>> > FlinkPipelineRunner.java:106)
>> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
>> > at
>> > org.apache.beam.runners.flink.FlinkPipelineRunner.run(
>>
>> > FlinkPipelineRunner.java:49)
>> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
>> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
>> > at
>> > com.cisco.ndp.pipeline.common.runner.flink.PipelineMain.
>> > main(PipelineMain.java:70)
>> > ~[pipelinecommon-1.0.0.jar:?]
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > ~[?:1.8.0_92]
>> > at
>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:
>> > 62)
>> > ~[?:1.8.0_92]
>> > at
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> > DelegatingMethodAccessorImpl.java:43)
>> > ~[?:1.8.0_92]
>> > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92]
>> > at
>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(
>> > PackagedProgram.java:505)
>> > ~[flink-clients_2.10-1.0.3.jar:1.0.3]
>> > ... 9 more
>> > Caused by: java.io.NotSerializableException:
>> > org.apache.kafka.common.TopicPartition
>> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1184)
>> > ~[?:1.8.0_92]
>> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> > ~[?:1.8.0_92]
>> > at java.util.ArrayList.writeObject(ArrayList.java:762) ~[?:1.8.0_92]
>> > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[?:?]
>> > at
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> > DelegatingMethodAccessorImpl.java:43)
>> > ~[?:1.8.0_92]
>> > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92]
>> > at java.io.ObjectStreamClass.invokeWriteObject(
>>
>> > ObjectStreamClass.java:1028)
>> > ~[?:1.8.0_92]
>> > at java.io.ObjectOutputStream.writeSerialData(
>> > ObjectOutputStream.java:1496)
>> > ~[?:1.8.0_92]
>> > at
>> > java.io.ObjectOutputStream.writeOrdinaryObject(
>> > ObjectOutputStream.java:1432)
>> > ~[?:1.8.0_92]
>> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1178)
>> > ~[?:1.8.0_92]
>> > at
>> > java.io.ObjectOutputStream.defaultWriteFields(
>> > ObjectOutputStream.java:1548)
>> > ~[?:1.8.0_92]
>> > at java.io.ObjectOutputStream.writeSerialData(
>> > ObjectOutputStream.java:1509)
>> > ~[?:1.8.0_92]
>> > at
>> > java.io.ObjectOutputStream.writeOrdinaryObject(
>> > ObjectOutputStream.java:1432)
>> > ~[?:1.8.0_92]
>> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1178)
>> > ~[?:1.8.0_92]
>> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> > ~[?:1.8.0_92]
>> > at java.util.ArrayList.writeObject(ArrayList.java:762) ~[?:1.8.0_92]
>> > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[?:?]
>> >
>> >
>> >
>> > Regards
>> > Sumit Chawla
>> >
>>
>
>

Reply via email to