Repository: flink
Updated Branches:
  refs/heads/master 4dfefd042 -> 6f5fa7f74


[FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is enabled

This closes #4496.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f5fa7f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f5fa7f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f5fa7f7

Branch: refs/heads/master
Commit: 6f5fa7f741538207244368c275bee9958c43a25a
Parents: 4dfefd0
Author: Xpray <leonxp...@gmail.com>
Authored: Tue Aug 8 16:18:26 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Tue Aug 8 19:20:32 2017 +0800

----------------------------------------------------------------------
 .../streaming/runtime/tasks/OperatorChain.java     | 12 ++++++++----
 .../streaming/api/StreamingOperatorsITCase.java    | 17 +++++++++++++++++
 2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f5fa7f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 0f29b73..b15f126 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -612,8 +612,10 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                                output.collect(shallowCopy);
                        }
 
-                       // don't copy for the last output
-                       outputs[outputs.length - 1].collect(record);
+                       if (outputs.length > 0) {
+                               // don't copy for the last output
+                               outputs[outputs.length - 1].collect(record);
+                       }
                }
 
                @Override
@@ -625,8 +627,10 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                                output.collect(outputTag, shallowCopy);
                        }
 
-                       // don't copy for the last output
-                       outputs[outputs.length - 1].collect(outputTag, record);
+                       if (outputs.length > 0) {
+                               // don't copy for the last output
+                               outputs[outputs.length - 1].collect(outputTag, 
record);
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f5fa7f7/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index 6d2f8c5..32a04fa 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.streaming.api;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -34,6 +35,8 @@ import 
org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.MathUtils;
 
 import org.junit.Assert;
@@ -378,4 +381,18 @@ public class StreamingOperatorsITCase extends 
StreamingMultipleProgramsTestBase
                        collections.clear();
                }
        }
+
+       @Test
+       public void testOperatorChainWithObjectReuseAndNoOutputOperators() 
throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().enableObjectReuse();
+               DataStream<Integer> input = env.fromElements(1, 2, 3);
+               input.flatMap(new FlatMapFunction<Integer, Integer>() {
+                       @Override
+                       public void flatMap(Integer value, Collector<Integer> 
out) throws Exception {
+                               out.collect(value << 1);
+                       }
+               });
+               env.execute();
+       }
 }

Reply via email to