@Dan: You're right that the PipelineOptions shouldn't be cached like
this. In this particular wrapper, it was not even necessary.

@Jiankang: I've pushed a fix to the repository with a few
improvements. Could you please try again? You will have to recompile.

Thanks,
Max

On Thu, Mar 17, 2016 at 8:44 AM, Dan Halperin <[email protected]> wrote:
> +Max for the Flink Runner, and +Luke who wrote most of the initial code
> around PipelineOptions.
>
> The UnboundedFlinkSource is caching the `PipelineOptions` object, here:
> https://github.com/apache/incubator-beam/blob/071e4dd67021346b0cab2aafa0900ec7e34c4ef8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java#L36
>
> I think this is a mismatch with how we intended them to be used. For
> example, the PipelineOptions may be changed by a Runner between graph
> construction time (when the UnboundedFlinkSource is created) and actual
> pipeline execution time. This is partially why, for example, PipelineOptions
> are provided by the Runner as an argument to functions like
> DoFn.startBundle, .processElement, and .finishBundle.
>
> PipelineOptions itself does not extend Serializable, and per the
> PipelineOptions documentation it looks like we intend for it to be
> serialized through Jackson rather than through Java serialization. I bet the
> Flink runner does this, and we probably just need to remove this cached
> PipelineOptions from the unbounded source.
>
> I'll let Luke and Max correct me on any or all of the above :)
>
> Thanks,
> Dan
>
> On Wed, Mar 16, 2016 at 10:57 PM, 刘见康 <[email protected]> wrote:
>>
>> Hi guys,
>>
>> Failed to run KafkaWindowedWordCountExample with Unable to serialize
>> exception, the stack exception as below:
>>
>> 16/03/17 13:49:09 INFO flink.FlinkPipelineRunner:
>> PipelineOptions.filesToStage was not specified. Defaulting to files from
>> the classpath: will stage 160 files. Enable logging at DEBUG level to see
>> which files will be staged.
>> 16/03/17 13:49:09 INFO flink.FlinkPipelineExecutionEnvironment: Creating
>> the required Streaming Environment.
>> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumer08: Trying to get topic
>> metadata from broker localhost:9092 in try 0/3
>> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumerBase: Consumer is going to
>> read the following topics (with number of partitions):
>> Exception in thread "main" java.lang.IllegalArgumentException: unable to
>> serialize
>>
>> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource@2d29b4ee
>> at
>>
>> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
>> at
>>
>> com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:84)
>> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:194)
>> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:189)
>> at com.google.cloud.dataflow.sdk.io.Read.from(Read.java:69)
>> at
>>
>> org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:129)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> Caused by: java.io.NotSerializableException:
>> com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at
>>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at
>>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at
>>
>> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
>> ... 10 more
>>
>> I found there is a similar issue in flink-dataflow
>> https://github.com/dataArtisans/flink-dataflow/issues/8.
>>
>> Do you have an idea about this error?
>>
>> Thanks
>> Jiankang
>
>

Reply via email to