@Dan: You're right that the PipelineOptions shouldn't be cached like this. In this particular wrapper, it was not even necessary.
@Jiankang: I've pushed a fix to the repository with a few improvements. Could you please try again? You will have to recompile. Thanks, Max On Thu, Mar 17, 2016 at 8:44 AM, Dan Halperin <[email protected]> wrote: > +Max for the Flink Runner, and +Luke who wrote most of the initial code > around PipelineOptions. > > The UnboundedFlinkSource is caching the `PipelineOptions` object, here: > https://github.com/apache/incubator-beam/blob/071e4dd67021346b0cab2aafa0900ec7e34c4ef8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java#L36 > > I think this is a mismatch with how we intended them to be used. For > example, the PipelineOptions may be changed by a Runner between graph > construction time (when the UnboundedFlinkSource is created) and actual > pipeline execution time. This is partially why, for example, PipelineOptions > are provided by the Runner as an argument to functions like > DoFn.startBundle, .processElement, and .finishBundle. > > PipelineOptions itself does not extend Serializable, and per the > PipelineOptions documentation it looks like we intend for it to be > serialized through Jackson rather than through Java serialization. I bet the > Flink runner does this, and we probably just need to remove this cached > PipelineOptions from the unbounded source. > > I'll let Luke and Max correct me on any or all of the above :) > > Thanks, > Dan > > On Wed, Mar 16, 2016 at 10:57 PM, 刘见康 <[email protected]> wrote: >> >> Hi guys, >> >> Failed to run KafkaWindowedWordCountExample with Unable to serialize >> exception, the stack exception as below: >> >> 16/03/17 13:49:09 INFO flink.FlinkPipelineRunner: >> PipelineOptions.filesToStage was not specified. Defaulting to files from >> the classpath: will stage 160 files. Enable logging at DEBUG level to see >> which files will be staged. >> 16/03/17 13:49:09 INFO flink.FlinkPipelineExecutionEnvironment: Creating >> the required Streaming Environment. >> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumer08: Trying to get topic >> metadata from broker localhost:9092 in try 0/3 >> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumerBase: Consumer is going to >> read the following topics (with number of partitions): >> Exception in thread "main" java.lang.IllegalArgumentException: unable to >> serialize >> >> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource@2d29b4ee >> at >> >> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54) >> at >> >> com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:84) >> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:194) >> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:189) >> at com.google.cloud.dataflow.sdk.io.Read.from(Read.java:69) >> at >> >> org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:129) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >> Caused by: java.io.NotSerializableException: >> com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> at >> >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> at >> >> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50) >> ... 10 more >> >> I found there is a similar issue in flink-dataflow >> https://github.com/dataArtisans/flink-dataflow/issues/8. >> >> Do you have an idea about this error? >> >> Thanks >> Jiankang > >
