Hi Jiankang,

Thanks for reporting again. I'm sorry that you ran into another
problem. This example had been working but it has some small problems
with the new code base we just migrated to.

I've fixed and tested the example and would invite you to try again.

Thanks,
Max

On Thu, Mar 17, 2016 at 1:25 PM, 刘见康 <[email protected]> wrote:
> @Max:
> Thanks for your quick fix, this serializable exception has been solved.
> However, it reported another one:
> 16/03/17 20:14:23 INFO flink.FlinkPipelineRunner:
> PipelineOptions.filesToStage was not specified. Defaulting to files from
> the classpath: will stage 158 files. Enable logging at DEBUG level to see
> which files will be staged.
> 16/03/17 20:14:23 INFO flink.FlinkPipelineExecutionEnvironment: Creating
> the required Streaming Environment.
> 16/03/17 20:14:23 INFO kafka.FlinkKafkaConsumer08: Trying to get topic
> metadata from broker localhost:9092 in try 0/3
> 16/03/17 20:14:23 INFO kafka.FlinkKafkaConsumerBase: Consumer is going to
> read the following topics (with number of partitions):
> Exception in thread "main" java.lang.RuntimeException: Flink Sources are
> supported only when running with the FlinkPipelineRunner.
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource.getDefaultOutputCoder(UnboundedFlinkSource.java:71)
> at
> com.google.cloud.dataflow.sdk.io.Read$Unbounded.getDefaultOutputCoder(Read.java:230)
> at
> com.google.cloud.dataflow.sdk.transforms.PTransform.getDefaultOutputCoder(PTransform.java:294)
> at
> com.google.cloud.dataflow.sdk.transforms.PTransform.getDefaultOutputCoder(PTransform.java:309)
> at
> com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:167)
> at
> com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48)
> at
> com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137)
> at
> com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88)
> at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:331)
> at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
> at
> com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161)
> at
> org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:127)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>
> Dive into the UnboundedFlinkSource class, it just like a simple class imply
> the UnboundedSource interface with throw RuntimeException.
> I just wonder if this Kafka Streaming example is runnable?
>
> Thanks
> Jiankang
>
>
> On Thu, Mar 17, 2016 at 7:35 PM, Maximilian Michels <[email protected]> wrote:
>
>> @Dan: You're right that the PipelineOptions shouldn't be cached like
>> this. In this particular wrapper, it was not even necessary.
>>
>> @Jiankang: I've pushed a fix to the repository with a few
>> improvements. Could you please try again? You will have to recompile.
>>
>> Thanks,
>> Max
>>
>> On Thu, Mar 17, 2016 at 8:44 AM, Dan Halperin <[email protected]> wrote:
>> > +Max for the Flink Runner, and +Luke who wrote most of the initial code
>> > around PipelineOptions.
>> >
>> > The UnboundedFlinkSource is caching the `PipelineOptions` object, here:
>> >
>> https://github.com/apache/incubator-beam/blob/071e4dd67021346b0cab2aafa0900ec7e34c4ef8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java#L36
>> >
>> > I think this is a mismatch with how we intended them to be used. For
>> > example, the PipelineOptions may be changed by a Runner between graph
>> > construction time (when the UnboundedFlinkSource is created) and actual
>> > pipeline execution time. This is partially why, for example,
>> PipelineOptions
>> > are provided by the Runner as an argument to functions like
>> > DoFn.startBundle, .processElement, and .finishBundle.
>> >
>> > PipelineOptions itself does not extend Serializable, and per the
>> > PipelineOptions documentation it looks like we intend for it to be
>> > serialized through Jackson rather than through Java serialization. I bet
>> the
>> > Flink runner does this, and we probably just need to remove this cached
>> > PipelineOptions from the unbounded source.
>> >
>> > I'll let Luke and Max correct me on any or all of the above :)
>> >
>> > Thanks,
>> > Dan
>> >
>> > On Wed, Mar 16, 2016 at 10:57 PM, 刘见康 <[email protected]> wrote:
>> >>
>> >> Hi guys,
>> >>
>> >> Failed to run KafkaWindowedWordCountExample with Unable to serialize
>> >> exception, the stack exception as below:
>> >>
>> >> 16/03/17 13:49:09 INFO flink.FlinkPipelineRunner:
>> >> PipelineOptions.filesToStage was not specified. Defaulting to files from
>> >> the classpath: will stage 160 files. Enable logging at DEBUG level to
>> see
>> >> which files will be staged.
>> >> 16/03/17 13:49:09 INFO flink.FlinkPipelineExecutionEnvironment: Creating
>> >> the required Streaming Environment.
>> >> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumer08: Trying to get topic
>> >> metadata from broker localhost:9092 in try 0/3
>> >> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumerBase: Consumer is going
>> to
>> >> read the following topics (with number of partitions):
>> >> Exception in thread "main" java.lang.IllegalArgumentException: unable to
>> >> serialize
>> >>
>> >>
>> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource@2d29b4ee
>> >> at
>> >>
>> >>
>> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
>> >> at
>> >>
>> >>
>> com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:84)
>> >> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:194)
>> >> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:189)
>> >> at com.google.cloud.dataflow.sdk.io.Read.from(Read.java:69)
>> >> at
>> >>
>> >>
>> org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:129)
>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >> at
>> >>
>> >>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >> at
>> >>
>> >>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >> at java.lang.reflect.Method.invoke(Method.java:497)
>> >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> >> Caused by: java.io.NotSerializableException:
>> >> com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler
>> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> >> at
>> >>
>> >>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> >> at
>> >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> >> at
>> >>
>> >>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> >> at
>> >>
>> >>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> >> at
>> >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> >> at
>> >>
>> >>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> >> at
>> >>
>> >>
>> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
>> >> ... 10 more
>> >>
>> >> I found there is a similar issue in flink-dataflow
>> >> https://github.com/dataArtisans/flink-dataflow/issues/8.
>> >>
>> >> Do you have an idea about this error?
>> >>
>> >> Thanks
>> >> Jiankang
>> >
>> >
>>

Reply via email to