[
https://issues.apache.org/jira/browse/FLINK-10155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16582422#comment-16582422
]
Aljoscha Krettek commented on FLINK-10155:
------------------------------------------
The underlying problem is that the {{WindowFunction}} directly emits the
{{Iterable}} argument down to the next function. This has a Flink-internal
implementation that is not necessarily good to serialize. Then, Flink by
default copies elements when passing them between operations. If there is no
known serializer for this we use Kryo and serialize and deserialize the object
once. That's the exception you see thrown here.
One solution is to put the code that would be in the {{MapFunction}} directly
into the {{WindowFunction}} or, if that doesn't work, to copy the contents of
the {{Iterable}} into a good type that has a good serializer.
> JobExecutionException when using evictor followed with map
> ----------------------------------------------------------
>
> Key: FLINK-10155
> URL: https://issues.apache.org/jira/browse/FLINK-10155
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.6.0
> Reporter: Prithvi Raj
> Priority: Major
>
> The DataStream API encounters `JobExecutionException` when using a `map`
> after an event time `timeWindow` with an `evictor`.
> The exception is thrown at `out.collect` on the `WindowFunction` and is not
> thrown when an `evictor` isn't used, or when not using event time semantics.
>
> The exception is:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException:
> java.lang.RuntimeException: Exception occurred while processing valve output
> watermark:
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
> at
> com.uber.jaeger.dependencies.DependenciesProcessorTest.testMapAfterWindowing(DependenciesProcessorTest.java:101)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
> at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.lang.RuntimeException: Exception occurred while processing
> valve output watermark:
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.uber.jaeger.dependencies.DependenciesProcessorTest$2.apply(DependenciesProcessorTest.java:89)
> at
> com.uber.jaeger.dependencies.DependenciesProcessorTest$2.apply(DependenciesProcessorTest.java:86)
> at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
> at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
> at
> org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.emitWindowContents(EvictingWindowOperator.java:359)
> at
> org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.onEventTime(EvictingWindowOperator.java:271)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> ... 7 more
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> ... 22 more
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.NullPointerException
> Serialization trace:
> strategies
> (org.apache.flink.api.common.state.StateTtlConfig$CleanupStrategies)
> cleanupStrategies (org.apache.flink.api.common.state.StateTtlConfig)
> ttlConfig (org.apache.flink.api.common.state.ListStateDescriptor)
> evictingWindowStateDescriptor
> (org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator)
> this$0
> (org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator$2)
> val$function
> (org.apache.flink.shaded.guava18.com.google.common.collect.Iterables$8)
> val$fromIterable
> (org.apache.flink.shaded.guava18.com.google.common.collect.Iterables$8)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:224)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
> ... 28 more
> Caused by: java.lang.NullPointerException
> at java.util.EnumMap$EnumMapIterator.hasNext(EnumMap.java:527)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:80)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ... 50 more{code}
>
> And the code is:
> {code:java}
> @Test
> public void testMapAfterWindowing() throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStreamSource<String> stringStream = env.fromElements("this", "is",
> "some", "data", "tomfoolery");
> SingleOutputStreamOperator<String> source =
> stringStream.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
> source.keyBy(new KeySelector<String, String>() {
> @Override
> public String getKey(String value) throws Exception {
> return value.substring(0, 1);
> }
> })
> .timeWindow(Time.milliseconds(100))
> .evictor(CountEvictor.of(1))
> .apply(new WindowFunction<String, Iterable<String>, String,
> TimeWindow>() {
> @Override
> public void apply(String s, TimeWindow window,
> Iterable<String> input, Collector<Iterable<String>> out) throws Exception {
> out.collect(input);
> }
> })
> .map(new RichMapFunction<Iterable<String>, Iterable<String>>() {
> // Identity map function
> @Override
> public Iterable<String> map(Iterable<String> value) throws
> Exception {
> return value;
> }
> })
> .printToErr();
> env.execute();
> }{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)