+Max for the Flink Runner, and +Luke who wrote most of the initial code
around PipelineOptions.

The UnboundedFlinkSource is caching the `PipelineOptions` object, here:
https://github.com/apache/incubator-beam/blob/071e4dd67021346b0cab2aafa0900ec7e34c4ef8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java#L36

I think this is a mismatch with how we intended them to be used. For
example, the PipelineOptions may be changed by a Runner between graph
construction time (when the UnboundedFlinkSource is created) and actual
pipeline execution time. This is partially why, for example,
PipelineOptions are provided by the Runner as an argument to functions like
DoFn.startBundle, .processElement, and .finishBundle.

PipelineOptions itself does not extend Serializable, and per the
PipelineOptions documentation it looks like we intend for it to be
serialized through Jackson rather than through Java serialization. I bet
the Flink runner does this, and we probably just need to remove this cached
PipelineOptions from the unbounded source.

I'll let Luke and Max correct me on any or all of the above :)

Thanks,
Dan

On Wed, Mar 16, 2016 at 10:57 PM, 刘见康 <[email protected]> wrote:

> Hi guys,
>
> Failed to run KafkaWindowedWordCountExample with Unable to serialize
> exception, the stack exception as below:
>
> 16/03/17 13:49:09 INFO flink.FlinkPipelineRunner:
> PipelineOptions.filesToStage was not specified. Defaulting to files from
> the classpath: will stage 160 files. Enable logging at DEBUG level to see
> which files will be staged.
> 16/03/17 13:49:09 INFO flink.FlinkPipelineExecutionEnvironment: Creating
> the required Streaming Environment.
> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumer08: Trying to get topic
> metadata from broker localhost:9092 in try 0/3
> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumerBase: Consumer is going to
> read the following topics (with number of partitions):
> Exception in thread "main" java.lang.IllegalArgumentException: unable to
> serialize
>
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource@2d29b4ee
> at
>
> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
> at
>
> com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:84)
> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:194)
> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:189)
> at com.google.cloud.dataflow.sdk.io.Read.from(Read.java:69)
> at
>
> org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:129)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.io.NotSerializableException:
> com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
>
> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
> ... 10 more
>
> I found there is a similar issue in flink-dataflow
> https://github.com/dataArtisans/flink-dataflow/issues/8.
>
> Do you have an idea about this error?
>
> Thanks
> Jiankang
>

Reply via email to