[ 
https://issues.apache.org/jira/browse/FLINK-10155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-10155:
-----------------------------------
    Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> JobExecutionException when using evictor followed with map
> ----------------------------------------------------------
>
>                 Key: FLINK-10155
>                 URL: https://issues.apache.org/jira/browse/FLINK-10155
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.6.0
>            Reporter: Prithvi Raj
>            Priority: Minor
>              Labels: auto-deprioritized-major, stale-minor
>
> The DataStream API encounters `JobExecutionException` when using a `map` 
> after an event time `timeWindow` with an `evictor`. 
> The exception is thrown at `out.collect` on the `WindowFunction` and is not 
> thrown when an `evictor` isn't used, or when not using event time semantics.
>  
> The exception is: 
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
> at 
> com.uber.jaeger.dependencies.DependenciesProcessorTest.testMapAfterWindowing(DependenciesProcessorTest.java:101)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
> at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.lang.RuntimeException: Exception occurred while processing 
> valve output watermark: 
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
> at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
> at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at 
> com.uber.jaeger.dependencies.DependenciesProcessorTest$2.apply(DependenciesProcessorTest.java:89)
> at 
> com.uber.jaeger.dependencies.DependenciesProcessorTest$2.apply(DependenciesProcessorTest.java:86)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.emitWindowContents(EvictingWindowOperator.java:359)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.onEventTime(EvictingWindowOperator.java:271)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
> at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> ... 7 more
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
> at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> ... 22 more
> Caused by: com.esotericsoftware.kryo.KryoException: 
> java.lang.NullPointerException
> Serialization trace:
> strategies 
> (org.apache.flink.api.common.state.StateTtlConfig$CleanupStrategies)
> cleanupStrategies (org.apache.flink.api.common.state.StateTtlConfig)
> ttlConfig (org.apache.flink.api.common.state.ListStateDescriptor)
> evictingWindowStateDescriptor 
> (org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator)
> this$0 
> (org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator$2)
> val$function 
> (org.apache.flink.shaded.guava18.com.google.common.collect.Iterables$8)
> val$fromIterable 
> (org.apache.flink.shaded.guava18.com.google.common.collect.Iterables$8)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:224)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
> ... 28 more
> Caused by: java.lang.NullPointerException
> at java.util.EnumMap$EnumMapIterator.hasNext(EnumMap.java:527)
> at 
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:80)
> at 
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ... 50 more{code}
>  
> And the code is:
> {code:java}
> @Test
> public void testMapAfterWindowing() throws Exception {
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(1);
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     DataStreamSource<String> stringStream = env.fromElements("this", "is", 
> "some", "data", "tomfoolery");
>     SingleOutputStreamOperator<String> source = 
> stringStream.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
>     source.keyBy(new KeySelector<String, String>() {
>         @Override
>         public String getKey(String value) throws Exception {
>             return value.substring(0, 1);
>         }
>     })
>             .timeWindow(Time.milliseconds(100))
>             .evictor(CountEvictor.of(1))
>             .apply(new WindowFunction<String, Iterable<String>, String, 
> TimeWindow>() {
>                 @Override
>                 public void apply(String s, TimeWindow window, 
> Iterable<String> input, Collector<Iterable<String>> out) throws Exception {
>                     out.collect(input);
>                 }
>             })
>             .map(new RichMapFunction<Iterable<String>, Iterable<String>>() {
>                 // Identity map function
>                 @Override
>                 public Iterable<String> map(Iterable<String> value) throws 
> Exception {
>                     return value;
>                 }
>             })
>             .printToErr();
>     env.execute();
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to