Hi guys, Failed to run KafkaWindowedWordCountExample with Unable to serialize exception, the stack exception as below:
16/03/17 13:49:09 INFO flink.FlinkPipelineRunner: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 160 files. Enable logging at DEBUG level to see which files will be staged. 16/03/17 13:49:09 INFO flink.FlinkPipelineExecutionEnvironment: Creating the required Streaming Environment. 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumer08: Trying to get topic metadata from broker localhost:9092 in try 0/3 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumerBase: Consumer is going to read the following topics (with number of partitions): Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource@2d29b4ee at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54) at com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:84) at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:194) at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:189) at com.google.cloud.dataflow.sdk.io.Read.from(Read.java:69) at org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:129) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: java.io.NotSerializableException: com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50) ... 10 more I found there is a similar issue in flink-dataflow https://github.com/dataArtisans/flink-dataflow/issues/8. Do you have an idea about this error? Thanks Jiankang
