@Max: Thanks for your quick fix, this serializable exception has been solved. However, it reported another one: 16/03/17 20:14:23 INFO flink.FlinkPipelineRunner: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 158 files. Enable logging at DEBUG level to see which files will be staged. 16/03/17 20:14:23 INFO flink.FlinkPipelineExecutionEnvironment: Creating the required Streaming Environment. 16/03/17 20:14:23 INFO kafka.FlinkKafkaConsumer08: Trying to get topic metadata from broker localhost:9092 in try 0/3 16/03/17 20:14:23 INFO kafka.FlinkKafkaConsumerBase: Consumer is going to read the following topics (with number of partitions): Exception in thread "main" java.lang.RuntimeException: Flink Sources are supported only when running with the FlinkPipelineRunner. at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource.getDefaultOutputCoder(UnboundedFlinkSource.java:71) at com.google.cloud.dataflow.sdk.io.Read$Unbounded.getDefaultOutputCoder(Read.java:230) at com.google.cloud.dataflow.sdk.transforms.PTransform.getDefaultOutputCoder(PTransform.java:294) at com.google.cloud.dataflow.sdk.transforms.PTransform.getDefaultOutputCoder(PTransform.java:309) at com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:167) at com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48) at com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137) at com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88) at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:331) at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274) at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161) at org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:127) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Dive into the UnboundedFlinkSource class, it just like a simple class imply the UnboundedSource interface with throw RuntimeException. I just wonder if this Kafka Streaming example is runnable? Thanks Jiankang On Thu, Mar 17, 2016 at 7:35 PM, Maximilian Michels <[email protected]> wrote: > @Dan: You're right that the PipelineOptions shouldn't be cached like > this. In this particular wrapper, it was not even necessary. > > @Jiankang: I've pushed a fix to the repository with a few > improvements. Could you please try again? You will have to recompile. > > Thanks, > Max > > On Thu, Mar 17, 2016 at 8:44 AM, Dan Halperin <[email protected]> wrote: > > +Max for the Flink Runner, and +Luke who wrote most of the initial code > > around PipelineOptions. > > > > The UnboundedFlinkSource is caching the `PipelineOptions` object, here: > > > https://github.com/apache/incubator-beam/blob/071e4dd67021346b0cab2aafa0900ec7e34c4ef8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java#L36 > > > > I think this is a mismatch with how we intended them to be used. For > > example, the PipelineOptions may be changed by a Runner between graph > > construction time (when the UnboundedFlinkSource is created) and actual > > pipeline execution time. This is partially why, for example, > PipelineOptions > > are provided by the Runner as an argument to functions like > > DoFn.startBundle, .processElement, and .finishBundle. > > > > PipelineOptions itself does not extend Serializable, and per the > > PipelineOptions documentation it looks like we intend for it to be > > serialized through Jackson rather than through Java serialization. I bet > the > > Flink runner does this, and we probably just need to remove this cached > > PipelineOptions from the unbounded source. > > > > I'll let Luke and Max correct me on any or all of the above :) > > > > Thanks, > > Dan > > > > On Wed, Mar 16, 2016 at 10:57 PM, 刘见康 <[email protected]> wrote: > >> > >> Hi guys, > >> > >> Failed to run KafkaWindowedWordCountExample with Unable to serialize > >> exception, the stack exception as below: > >> > >> 16/03/17 13:49:09 INFO flink.FlinkPipelineRunner: > >> PipelineOptions.filesToStage was not specified. Defaulting to files from > >> the classpath: will stage 160 files. Enable logging at DEBUG level to > see > >> which files will be staged. > >> 16/03/17 13:49:09 INFO flink.FlinkPipelineExecutionEnvironment: Creating > >> the required Streaming Environment. > >> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumer08: Trying to get topic > >> metadata from broker localhost:9092 in try 0/3 > >> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumerBase: Consumer is going > to > >> read the following topics (with number of partitions): > >> Exception in thread "main" java.lang.IllegalArgumentException: unable to > >> serialize > >> > >> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource@2d29b4ee > >> at > >> > >> > com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54) > >> at > >> > >> > com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:84) > >> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:194) > >> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:189) > >> at com.google.cloud.dataflow.sdk.io.Read.from(Read.java:69) > >> at > >> > >> > org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:129) > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> at > >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> at > >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> at java.lang.reflect.Method.invoke(Method.java:497) > >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > >> Caused by: java.io.NotSerializableException: > >> com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler > >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > >> at > >> > >> > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > >> at > >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > >> at > >> > >> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > >> at > >> > >> > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > >> at > >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > >> at > >> > >> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > >> at > >> > >> > com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50) > >> ... 10 more > >> > >> I found there is a similar issue in flink-dataflow > >> https://github.com/dataArtisans/flink-dataflow/issues/8. > >> > >> Do you have an idea about this error? > >> > >> Thanks > >> Jiankang > > > > >
