Repository: flink Updated Branches: refs/heads/release-1.3 cd4c2b590 -> c1f578fba
[FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is enabled This closes #4496. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1f578fb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1f578fb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1f578fb Branch: refs/heads/release-1.3 Commit: c1f578fba60be7b77e1588367721f57b52b61225 Parents: cd4c2b5 Author: Xpray <leonxp...@gmail.com> Authored: Tue Aug 8 16:18:26 2017 +0800 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Tue Aug 8 19:22:09 2017 +0800 ---------------------------------------------------------------------- .../streaming/runtime/tasks/OperatorChain.java | 12 ++++++++---- .../streaming/api/StreamingOperatorsITCase.java | 17 +++++++++++++++++ 2 files changed, 25 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c1f578fb/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 870c2ed..0875279 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -612,8 +612,10 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea output.collect(shallowCopy); } - // don't copy for the last output - outputs[outputs.length - 1].collect(record); + if (outputs.length > 0) { + // don't copy for the last output + outputs[outputs.length - 1].collect(record); + } } @Override @@ -625,8 +627,10 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea output.collect(outputTag, shallowCopy); } - // don't copy for the last output - outputs[outputs.length - 1].collect(outputTag, record); + if (outputs.length > 0) { + // don't copy for the last output + outputs[outputs.length - 1].collect(outputTag, record); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/c1f578fb/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java index 8ea1bd8..39a8dd7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.streaming.api; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -34,6 +35,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + +import org.apache.flink.util.Collector; import org.apache.flink.util.MathUtils; import org.junit.*; @@ -373,4 +376,18 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase collections.clear(); } } + + @Test + public void testOperatorChainWithObjectReuseAndNoOutputOperators() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + DataStream<Integer> input = env.fromElements(1, 2, 3); + input.flatMap(new FlatMapFunction<Integer, Integer>() { + @Override + public void flatMap(Integer value, Collector<Integer> out) throws Exception { + out.collect(value << 1); + } + }); + env.execute(); + } }