+Max for the Flink Runner, and +Luke who wrote most of the initial code around PipelineOptions.
The UnboundedFlinkSource is caching the `PipelineOptions` object, here: https://github.com/apache/incubator-beam/blob/071e4dd67021346b0cab2aafa0900ec7e34c4ef8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java#L36 I think this is a mismatch with how we intended them to be used. For example, the PipelineOptions may be changed by a Runner between graph construction time (when the UnboundedFlinkSource is created) and actual pipeline execution time. This is partially why, for example, PipelineOptions are provided by the Runner as an argument to functions like DoFn.startBundle, .processElement, and .finishBundle. PipelineOptions itself does not extend Serializable, and per the PipelineOptions documentation it looks like we intend for it to be serialized through Jackson rather than through Java serialization. I bet the Flink runner does this, and we probably just need to remove this cached PipelineOptions from the unbounded source. I'll let Luke and Max correct me on any or all of the above :) Thanks, Dan On Wed, Mar 16, 2016 at 10:57 PM, 刘见康 <[email protected]> wrote: > Hi guys, > > Failed to run KafkaWindowedWordCountExample with Unable to serialize > exception, the stack exception as below: > > 16/03/17 13:49:09 INFO flink.FlinkPipelineRunner: > PipelineOptions.filesToStage was not specified. Defaulting to files from > the classpath: will stage 160 files. Enable logging at DEBUG level to see > which files will be staged. > 16/03/17 13:49:09 INFO flink.FlinkPipelineExecutionEnvironment: Creating > the required Streaming Environment. > 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumer08: Trying to get topic > metadata from broker localhost:9092 in try 0/3 > 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumerBase: Consumer is going to > read the following topics (with number of partitions): > Exception in thread "main" java.lang.IllegalArgumentException: unable to > serialize > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource@2d29b4ee > at > > com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54) > at > > com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:84) > at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:194) > at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:189) > at com.google.cloud.dataflow.sdk.io.Read.from(Read.java:69) > at > > org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:129) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > Caused by: java.io.NotSerializableException: > com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > > com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50) > ... 10 more > > I found there is a similar issue in flink-dataflow > https://github.com/dataArtisans/flink-dataflow/issues/8. > > Do you have an idea about this error? > > Thanks > Jiankang >
