@Max:
Thanks for your quick fix, this serializable exception has been solved.
However, it reported another one:
16/03/17 20:14:23 INFO flink.FlinkPipelineRunner:
PipelineOptions.filesToStage was not specified. Defaulting to files from
the classpath: will stage 158 files. Enable logging at DEBUG level to see
which files will be staged.
16/03/17 20:14:23 INFO flink.FlinkPipelineExecutionEnvironment: Creating
the required Streaming Environment.
16/03/17 20:14:23 INFO kafka.FlinkKafkaConsumer08: Trying to get topic
metadata from broker localhost:9092 in try 0/3
16/03/17 20:14:23 INFO kafka.FlinkKafkaConsumerBase: Consumer is going to
read the following topics (with number of partitions):
Exception in thread "main" java.lang.RuntimeException: Flink Sources are
supported only when running with the FlinkPipelineRunner.
at
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource.getDefaultOutputCoder(UnboundedFlinkSource.java:71)
at
com.google.cloud.dataflow.sdk.io.Read$Unbounded.getDefaultOutputCoder(Read.java:230)
at
com.google.cloud.dataflow.sdk.transforms.PTransform.getDefaultOutputCoder(PTransform.java:294)
at
com.google.cloud.dataflow.sdk.transforms.PTransform.getDefaultOutputCoder(PTransform.java:309)
at
com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:167)
at
com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48)
at
com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137)
at
com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:331)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at
com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161)
at
org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:127)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

Dive into the UnboundedFlinkSource class, it just like a simple class imply
the UnboundedSource interface with throw RuntimeException.
I just wonder if this Kafka Streaming example is runnable?

Thanks
Jiankang


On Thu, Mar 17, 2016 at 7:35 PM, Maximilian Michels <[email protected]> wrote:

> @Dan: You're right that the PipelineOptions shouldn't be cached like
> this. In this particular wrapper, it was not even necessary.
>
> @Jiankang: I've pushed a fix to the repository with a few
> improvements. Could you please try again? You will have to recompile.
>
> Thanks,
> Max
>
> On Thu, Mar 17, 2016 at 8:44 AM, Dan Halperin <[email protected]> wrote:
> > +Max for the Flink Runner, and +Luke who wrote most of the initial code
> > around PipelineOptions.
> >
> > The UnboundedFlinkSource is caching the `PipelineOptions` object, here:
> >
> https://github.com/apache/incubator-beam/blob/071e4dd67021346b0cab2aafa0900ec7e34c4ef8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java#L36
> >
> > I think this is a mismatch with how we intended them to be used. For
> > example, the PipelineOptions may be changed by a Runner between graph
> > construction time (when the UnboundedFlinkSource is created) and actual
> > pipeline execution time. This is partially why, for example,
> PipelineOptions
> > are provided by the Runner as an argument to functions like
> > DoFn.startBundle, .processElement, and .finishBundle.
> >
> > PipelineOptions itself does not extend Serializable, and per the
> > PipelineOptions documentation it looks like we intend for it to be
> > serialized through Jackson rather than through Java serialization. I bet
> the
> > Flink runner does this, and we probably just need to remove this cached
> > PipelineOptions from the unbounded source.
> >
> > I'll let Luke and Max correct me on any or all of the above :)
> >
> > Thanks,
> > Dan
> >
> > On Wed, Mar 16, 2016 at 10:57 PM, 刘见康 <[email protected]> wrote:
> >>
> >> Hi guys,
> >>
> >> Failed to run KafkaWindowedWordCountExample with Unable to serialize
> >> exception, the stack exception as below:
> >>
> >> 16/03/17 13:49:09 INFO flink.FlinkPipelineRunner:
> >> PipelineOptions.filesToStage was not specified. Defaulting to files from
> >> the classpath: will stage 160 files. Enable logging at DEBUG level to
> see
> >> which files will be staged.
> >> 16/03/17 13:49:09 INFO flink.FlinkPipelineExecutionEnvironment: Creating
> >> the required Streaming Environment.
> >> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumer08: Trying to get topic
> >> metadata from broker localhost:9092 in try 0/3
> >> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumerBase: Consumer is going
> to
> >> read the following topics (with number of partitions):
> >> Exception in thread "main" java.lang.IllegalArgumentException: unable to
> >> serialize
> >>
> >>
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource@2d29b4ee
> >> at
> >>
> >>
> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
> >> at
> >>
> >>
> com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:84)
> >> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:194)
> >> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:189)
> >> at com.google.cloud.dataflow.sdk.io.Read.from(Read.java:69)
> >> at
> >>
> >>
> org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:129)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> >>
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:497)
> >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> >> Caused by: java.io.NotSerializableException:
> >> com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler
> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> >> at
> >>
> >>
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> >> at
> >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> >> at
> >>
> >>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> >> at
> >>
> >>
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> >> at
> >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> >> at
> >>
> >>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> >> at
> >>
> >>
> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
> >> ... 10 more
> >>
> >> I found there is a similar issue in flink-dataflow
> >> https://github.com/dataArtisans/flink-dataflow/issues/8.
> >>
> >> Do you have an idea about this error?
> >>
> >> Thanks
> >> Jiankang
> >
> >
>

Reply via email to