Repository: flink
Updated Branches:
  refs/heads/release-1.3 cd4c2b590 -> c1f578fba


[FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is enabled

This closes #4496.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1f578fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1f578fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1f578fb

Branch: refs/heads/release-1.3
Commit: c1f578fba60be7b77e1588367721f57b52b61225
Parents: cd4c2b5
Author: Xpray <leonxp...@gmail.com>
Authored: Tue Aug 8 16:18:26 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Tue Aug 8 19:22:09 2017 +0800

----------------------------------------------------------------------
 .../streaming/runtime/tasks/OperatorChain.java     | 12 ++++++++----
 .../streaming/api/StreamingOperatorsITCase.java    | 17 +++++++++++++++++
 2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c1f578fb/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 870c2ed..0875279 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -612,8 +612,10 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                                output.collect(shallowCopy);
                        }
 
-                       // don't copy for the last output
-                       outputs[outputs.length - 1].collect(record);
+                       if (outputs.length > 0) {
+                               // don't copy for the last output
+                               outputs[outputs.length - 1].collect(record);
+                       }
                }
 
                @Override
@@ -625,8 +627,10 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                                output.collect(outputTag, shallowCopy);
                        }
 
-                       // don't copy for the last output
-                       outputs[outputs.length - 1].collect(outputTag, record);
+                       if (outputs.length > 0) {
+                               // don't copy for the last output
+                               outputs[outputs.length - 1].collect(outputTag, 
record);
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c1f578fb/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index 8ea1bd8..39a8dd7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.streaming.api;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -34,6 +35,8 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.MathUtils;
 import org.junit.*;
 
@@ -373,4 +376,18 @@ public class StreamingOperatorsITCase extends 
StreamingMultipleProgramsTestBase
                        collections.clear();
                }
        }
+
+       @Test
+       public void testOperatorChainWithObjectReuseAndNoOutputOperators() 
throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().enableObjectReuse();
+               DataStream<Integer> input = env.fromElements(1, 2, 3);
+               input.flatMap(new FlatMapFunction<Integer, Integer>() {
+                       @Override
+                       public void flatMap(Integer value, Collector<Integer> 
out) throws Exception {
+                               out.collect(value << 1);
+                       }
+               });
+               env.execute();
+       }
 }

Reply via email to