Repository: flink
Updated Branches:
  refs/heads/master 3c4e59a7f -> 3f0b9fee5


[FLINK-9809] [DataSteam API] Allow setting co-location constraints on 
StreamTransformations.

This feature is currently only exposed on StreamTransformations (internal API) 
rather
than in the public API, because it is a hidden expert feature.

This closes #6309


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f0b9fee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f0b9fee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f0b9fee

Branch: refs/heads/master
Commit: 3f0b9fee5185fcc7200179459bed31fb4bd08bbf
Parents: 3c4e59a
Author: Stephan Ewen <[email protected]>
Authored: Wed Jul 11 17:48:10 2018 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Thu Jul 12 09:38:43 2018 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/graph/StreamGraph.java  |  22 ++--
 .../api/graph/StreamGraphGenerator.java         |   7 +-
 .../flink/streaming/api/graph/StreamNode.java   |  14 +++
 .../api/graph/StreamingJobGraphGenerator.java   |  43 ++++++--
 .../transformations/StreamTransformation.java   |  34 +++++++
 .../StreamGraphCoLocationConstraintTest.java    | 102 +++++++++++++++++++
 .../runtime/tasks/OneInputStreamTaskTest.java   |   2 +
 .../runtime/tasks/StreamConfigChainer.java      |   8 +-
 .../runtime/tasks/StreamTaskTestHarness.java    |   4 +-
 .../tasks/TwoInputStreamTaskTestHarness.java    |   4 +-
 10 files changed, 215 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index e5ed0c2..01768ad 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -53,6 +53,8 @@ import org.apache.flink.util.OutputTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -163,38 +165,41 @@ public class StreamGraph extends StreamingPlan {
 
        public <IN, OUT> void addSource(Integer vertexID,
                String slotSharingGroup,
+               @Nullable String coLocationGroup,
                StreamOperator<OUT> operatorObject,
                TypeInformation<IN> inTypeInfo,
                TypeInformation<OUT> outTypeInfo,
                String operatorName) {
-               addOperator(vertexID, slotSharingGroup, operatorObject, 
inTypeInfo, outTypeInfo, operatorName);
+               addOperator(vertexID, slotSharingGroup, coLocationGroup, 
operatorObject, inTypeInfo, outTypeInfo, operatorName);
                sources.add(vertexID);
        }
 
        public <IN, OUT> void addSink(Integer vertexID,
                String slotSharingGroup,
+               @Nullable String coLocationGroup,
                StreamOperator<OUT> operatorObject,
                TypeInformation<IN> inTypeInfo,
                TypeInformation<OUT> outTypeInfo,
                String operatorName) {
-               addOperator(vertexID, slotSharingGroup, operatorObject, 
inTypeInfo, outTypeInfo, operatorName);
+               addOperator(vertexID, slotSharingGroup, coLocationGroup, 
operatorObject, inTypeInfo, outTypeInfo, operatorName);
                sinks.add(vertexID);
        }
 
        public <IN, OUT> void addOperator(
                        Integer vertexID,
                        String slotSharingGroup,
+                       @Nullable String coLocationGroup,
                        StreamOperator<OUT> operatorObject,
                        TypeInformation<IN> inTypeInfo,
                        TypeInformation<OUT> outTypeInfo,
                        String operatorName) {
 
                if (operatorObject instanceof StoppableStreamSource) {
-                       addNode(vertexID, slotSharingGroup, 
StoppableSourceStreamTask.class, operatorObject, operatorName);
+                       addNode(vertexID, slotSharingGroup, coLocationGroup, 
StoppableSourceStreamTask.class, operatorObject, operatorName);
                } else if (operatorObject instanceof StreamSource) {
-                       addNode(vertexID, slotSharingGroup, 
SourceStreamTask.class, operatorObject, operatorName);
+                       addNode(vertexID, slotSharingGroup, coLocationGroup, 
SourceStreamTask.class, operatorObject, operatorName);
                } else {
-                       addNode(vertexID, slotSharingGroup, 
OneInputStreamTask.class, operatorObject, operatorName);
+                       addNode(vertexID, slotSharingGroup, coLocationGroup, 
OneInputStreamTask.class, operatorObject, operatorName);
                }
 
                TypeSerializer<IN> inSerializer = inTypeInfo != null && 
!(inTypeInfo instanceof MissingTypeInfo) ? 
inTypeInfo.createSerializer(executionConfig) : null;
@@ -223,13 +228,14 @@ public class StreamGraph extends StreamingPlan {
        public <IN1, IN2, OUT> void addCoOperator(
                        Integer vertexID,
                        String slotSharingGroup,
+                       @Nullable String coLocationGroup,
                        TwoInputStreamOperator<IN1, IN2, OUT> 
taskOperatorObject,
                        TypeInformation<IN1> in1TypeInfo,
                        TypeInformation<IN2> in2TypeInfo,
                        TypeInformation<OUT> outTypeInfo,
                        String operatorName) {
 
-               addNode(vertexID, slotSharingGroup, TwoInputStreamTask.class, 
taskOperatorObject, operatorName);
+               addNode(vertexID, slotSharingGroup, coLocationGroup, 
TwoInputStreamTask.class, taskOperatorObject, operatorName);
 
                TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && 
!(outTypeInfo instanceof MissingTypeInfo) ?
                                outTypeInfo.createSerializer(executionConfig) : 
null;
@@ -250,6 +256,7 @@ public class StreamGraph extends StreamingPlan {
 
        protected StreamNode addNode(Integer vertexID,
                String slotSharingGroup,
+               @Nullable String coLocationGroup,
                Class<? extends AbstractInvokable> vertexClass,
                StreamOperator<?> operatorObject,
                String operatorName) {
@@ -261,6 +268,7 @@ public class StreamGraph extends StreamingPlan {
                StreamNode vertex = new StreamNode(environment,
                        vertexID,
                        slotSharingGroup,
+                       coLocationGroup,
                        operatorObject,
                        operatorName,
                        new ArrayList<OutputSelector<?>>(),
@@ -593,6 +601,7 @@ public class StreamGraph extends StreamingPlan {
                ResourceSpec preferredResources) {
                StreamNode source = this.addNode(sourceId,
                        null,
+                       null,
                        StreamIterationHead.class,
                        null,
                        "IterationSource-" + loopId);
@@ -603,6 +612,7 @@ public class StreamGraph extends StreamingPlan {
 
                StreamNode sink = this.addNode(sinkId,
                        null,
+                       null,
                        StreamIterationTail.class,
                        null,
                        "IterationSink-" + loopId);

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 11a7002..2c4ae4a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -466,9 +466,11 @@ public class StreamGraphGenerator {
         * Transforms a {@code SourceTransformation}.
         */
        private <T> Collection<Integer> transformSource(SourceTransformation<T> 
source) {
-               String slotSharingGroup = 
determineSlotSharingGroup(source.getSlotSharingGroup(), new 
ArrayList<Integer>());
+               String slotSharingGroup = 
determineSlotSharingGroup(source.getSlotSharingGroup(), 
Collections.emptyList());
+
                streamGraph.addSource(source.getId(),
                                slotSharingGroup,
+                               source.getCoLocationGroupKey(),
                                source.getOperator(),
                                null,
                                source.getOutputType(),
@@ -493,6 +495,7 @@ public class StreamGraphGenerator {
 
                streamGraph.addSink(sink.getId(),
                                slotSharingGroup,
+                               sink.getCoLocationGroupKey(),
                                sink.getOperator(),
                                sink.getInput().getOutputType(),
                                null,
@@ -535,6 +538,7 @@ public class StreamGraphGenerator {
 
                streamGraph.addOperator(transform.getId(),
                                slotSharingGroup,
+                               transform.getCoLocationGroupKey(),
                                transform.getOperator(),
                                transform.getInputType(),
                                transform.getOutputType(),
@@ -580,6 +584,7 @@ public class StreamGraphGenerator {
                streamGraph.addCoOperator(
                                transform.getId(),
                                slotSharingGroup,
+                               transform.getCoLocationGroupKey(),
                                transform.getOperator(),
                                transform.getInputType1(),
                                transform.getInputType2(),

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 78ab877..fe12662 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -28,6 +28,8 @@ import 
org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
@@ -54,6 +56,7 @@ public class StreamNode implements Serializable {
        private Long bufferTimeout = null;
        private final String operatorName;
        private String slotSharingGroup;
+       private @Nullable String coLocationGroup;
        private KeySelector<?, ?> statePartitioner1;
        private KeySelector<?, ?> statePartitioner2;
        private TypeSerializer<?> stateKeySerializer;
@@ -77,10 +80,12 @@ public class StreamNode implements Serializable {
        public StreamNode(StreamExecutionEnvironment env,
                Integer id,
                String slotSharingGroup,
+               @Nullable String coLocationGroup,
                StreamOperator<?> operator,
                String operatorName,
                List<OutputSelector<?>> outputSelector,
                Class<? extends AbstractInvokable> jobVertexClass) {
+
                this.env = env;
                this.id = id;
                this.operatorName = operatorName;
@@ -88,6 +93,7 @@ public class StreamNode implements Serializable {
                this.outputSelectors = outputSelector;
                this.jobVertexClass = jobVertexClass;
                this.slotSharingGroup = slotSharingGroup;
+               this.coLocationGroup = coLocationGroup;
        }
 
        public void addInEdge(StreamEdge inEdge) {
@@ -253,6 +259,14 @@ public class StreamNode implements Serializable {
                return slotSharingGroup;
        }
 
+       public void setCoLocationGroup(@Nullable String coLocationGroup) {
+               this.coLocationGroup = coLocationGroup;
+       }
+
+       public @Nullable String getCoLocationGroup() {
+               return coLocationGroup;
+       }
+
        public boolean isSameSlotSharingGroup(StreamNode downstreamVertex) {
                return (slotSharingGroup == null && 
downstreamVertex.slotSharingGroup == null) ||
                                (slotSharingGroup != null && 
slotSharingGroup.equals(downstreamVertex.slotSharingGroup));

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 5b8254d..603b9e4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -152,7 +152,7 @@ public class StreamingJobGraphGenerator {
 
                setPhysicalEdges();
 
-               setSlotSharing();
+               setSlotSharingAndCoLocation();
 
                configureCheckpointing();
 
@@ -531,20 +531,43 @@ public class StreamingJobGraphGenerator {
                                && streamGraph.isChainingEnabled();
        }
 
-       private void setSlotSharing() {
-
-               Map<String, SlotSharingGroup> slotSharingGroups = new 
HashMap<>();
+       private void setSlotSharingAndCoLocation() {
+               final HashMap<String, SlotSharingGroup> slotSharingGroups = new 
HashMap<>();
+               final HashMap<String, Tuple2<SlotSharingGroup, 
CoLocationGroup>> coLocationGroups = new HashMap<>();
 
                for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
 
-                       String slotSharingGroup = 
streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup();
+                       final StreamNode node = 
streamGraph.getStreamNode(entry.getKey());
+                       final JobVertex vertex = entry.getValue();
+
+                       // configure slot sharing group
+                       final String slotSharingGroupKey = 
node.getSlotSharingGroup();
+                       final SlotSharingGroup sharingGroup;
+
+                       if (slotSharingGroupKey != null) {
+                               sharingGroup = 
slotSharingGroups.computeIfAbsent(
+                                               slotSharingGroupKey, (k) -> new 
SlotSharingGroup());
+                               vertex.setSlotSharingGroup(sharingGroup);
+                       } else {
+                               sharingGroup = null;
+                       }
+
+                       // configure co-location constraint
+                       final String coLocationGroupKey = 
node.getCoLocationGroup();
+                       if (coLocationGroupKey != null) {
+                               if (sharingGroup == null) {
+                                       throw new IllegalStateException("Cannot 
use a co-location constraint without a slot sharing group");
+                               }
+
+                               Tuple2<SlotSharingGroup, CoLocationGroup> 
constraint = coLocationGroups.computeIfAbsent(
+                                               coLocationGroupKey, (k) -> new 
Tuple2<>(sharingGroup, new CoLocationGroup()));
+
+                               if (constraint.f0 != sharingGroup) {
+                                       throw new IllegalStateException("Cannot 
co-locate operators from different slot sharing groups");
+                               }
 
-                       SlotSharingGroup group = 
slotSharingGroups.get(slotSharingGroup);
-                       if (group == null) {
-                               group = new SlotSharingGroup();
-                               slotSharingGroups.put(slotSharingGroup, group);
+                               vertex.updateCoLocationGroup(constraint.f1);
                        }
-                       entry.getValue().setSlotSharingGroup(group);
                }
 
                for (Tuple2<StreamNode, StreamNode> pair : 
streamGraph.getIterationSourceSinkPairs()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index 1f763bb..bfbe78d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -151,6 +153,9 @@ public abstract class StreamTransformation<T> {
 
        private String slotSharingGroup;
 
+       @Nullable
+       private String coLocationGroupKey;
+
        /**
         * Creates a new {@code StreamTransformation} with the given name, 
output type and parallelism.
         *
@@ -345,6 +350,35 @@ public abstract class StreamTransformation<T> {
        }
 
        /**
+        * <b>NOTE:</b> This is an internal undocumented feature for now. It is 
not
+        * clear whether this will be supported and stable in the long term.
+        *
+        * <p>Sets the key that identifies the co-location group.
+        * Operators with the same co-location key will have their 
corresponding subtasks
+        * placed into the same slot by the scheduler.
+        *
+        * <p>Setting this to null means there is no co-location constraint.
+        */
+       public void setCoLocationGroupKey(@Nullable String coLocationGroupKey) {
+               this.coLocationGroupKey = coLocationGroupKey;
+       }
+
+       /**
+        * <b>NOTE:</b> This is an internal undocumented feature for now. It is 
not
+        * clear whether this will be supported and stable in the long term.
+        *
+        * <p>Gets the key that identifies the co-location group.
+        * Operators with the same co-location key will have their 
corresponding subtasks
+        * placed into the same slot by the scheduler.
+        *
+        * <p>If this is null (which is the default), it means there is no 
co-location constraint.
+        */
+       @Nullable
+       public String getCoLocationGroupKey() {
+               return coLocationGroupKey;
+       }
+
+       /**
         * Tries to fill in the type information. Type information can be 
filled in
         * later when the program uses a type hint. This method checks whether 
the
         * type information has ever been accessed before and does not allow

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
new file mode 100644
index 0000000..a225681
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.graph;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Test that check the hidden API to set co location constraints on the
+ * stream transformations.
+ */
+public class StreamGraphCoLocationConstraintTest {
+
+       @Test
+       public void testSettingCoLocationConstraint() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(7);
+
+               // set up the test program
+               DataStream<Long> source = env.generateSequence(1L, 10_000_000);
+               source.getTransformation().setCoLocationGroupKey("group1");
+
+               DataStream<Long> step1 = source.keyBy(v -> v).map(v -> v);
+               step1.getTransformation().setCoLocationGroupKey("group2");
+
+               DataStream<Long> step2 = step1.keyBy(v -> v).map(v -> v);
+               step2.getTransformation().setCoLocationGroupKey("group1");
+
+               DataStreamSink<Long> result = step2.keyBy(v -> v).addSink(new 
DiscardingSink<>());
+               result.getTransformation().setCoLocationGroupKey("group2");
+
+               // get the graph
+               final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+               assertEquals(4, jobGraph.getNumberOfVertices());
+
+               List<JobVertex> vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               for (JobVertex vertex : vertices) {
+                       assertNotNull(vertex.getCoLocationGroup());
+               }
+
+               assertEquals(vertices.get(0).getCoLocationGroup(), 
vertices.get(2).getCoLocationGroup());
+               assertEquals(vertices.get(1).getCoLocationGroup(), 
vertices.get(3).getCoLocationGroup());
+       }
+
+       @Test
+       public void testCoLocateDifferenSharingGroups() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(7);
+
+               // set up the test program
+               DataStream<Long> source = env.generateSequence(1L, 10_000_000);
+               source.getTransformation().setSlotSharingGroup("ssg1");
+               source.getTransformation().setCoLocationGroupKey("co1");
+
+               DataStream<Long> step1 = source.keyBy(v -> v).map(v -> v);
+               step1.getTransformation().setSlotSharingGroup("ssg2");
+               step1.getTransformation().setCoLocationGroupKey("co2");
+
+               DataStream<Long> step2 = step1.keyBy(v -> v).map(v -> v);
+               step2.getTransformation().setSlotSharingGroup("ssg3");
+               step2.getTransformation().setCoLocationGroupKey("co1");
+
+               DataStreamSink<Long> result = step2.keyBy(v -> v).addSink(new 
DiscardingSink<>());
+               result.getTransformation().setSlotSharingGroup("ssg4");
+               result.getTransformation().setCoLocationGroupKey("co2");
+
+               // get the graph
+               try {
+                       env.getStreamGraph().getJobGraph();
+                       fail("exception expected");
+               }
+               catch (IllegalStateException ignored) {}
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 201e138..96eaa78 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -793,6 +793,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                                        null,
                                        null,
                                        null,
+                                       null,
                                        null
                                ),
                                new StreamNode(
@@ -802,6 +803,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                                        null,
                                        null,
                                        null,
+                                       null,
                                        null
                                ),
                                0,

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 74898a4..10e50ce 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -76,8 +76,8 @@ public class StreamConfigChainer {
 
                tailConfig.setChainedOutputs(Collections.singletonList(
                        new StreamEdge(
-                               new StreamNode(null, 
tailConfig.getChainIndex(), null, null, null, null, null),
-                               new StreamNode(null, chainIndex, null, null, 
null, null, null),
+                               new StreamNode(null, 
tailConfig.getChainIndex(), null, null, null, null, null, null),
+                               new StreamNode(null, chainIndex, null, null, 
null, null, null, null),
                                0,
                                Collections.<String>emptyList(),
                                null,
@@ -99,8 +99,8 @@ public class StreamConfigChainer {
                List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
                outEdgesInOrder.add(
                        new StreamEdge(
-                               new StreamNode(null, chainIndex, null, null, 
null, null, null),
-                               new StreamNode(null, chainIndex , null, null, 
null, null, null),
+                               new StreamNode(null, chainIndex, null, null, 
null, null, null, null),
+                               new StreamNode(null, chainIndex , null, null, 
null, null, null, null),
                                0,
                                Collections.<String>emptyList(),
                                new BroadcastPartitioner<Object>(),

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 08032bd..b2f1b99 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -176,8 +176,8 @@ public class StreamTaskTestHarness<OUT> {
                };
 
                List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
-               StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", 
dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), 
SourceStreamTask.class);
-               StreamNode targetVertexDummy = new StreamNode(null, 1, "group", 
dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), 
SourceStreamTask.class);
+               StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", 
null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), 
SourceStreamTask.class);
+               StreamNode targetVertexDummy = new StreamNode(null, 1, "group", 
null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), 
SourceStreamTask.class);
 
                outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, 
targetVertexDummy, 0, new LinkedList<String>(), new 
BroadcastPartitioner<Object>(), null /* output tag */));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index 78e6de2..4c1c424 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -116,8 +116,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> 
extends StreamTaskTest
                        private static final long serialVersionUID = 1L;
                };
 
-               StreamNode sourceVertexDummy = new StreamNode(null, 0, "default 
group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), 
SourceStreamTask.class);
-               StreamNode targetVertexDummy = new StreamNode(null, 1, "default 
group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), 
SourceStreamTask.class);
+               StreamNode sourceVertexDummy = new StreamNode(null, 0, "default 
group", null, dummyOperator, "source dummy", new 
LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+               StreamNode targetVertexDummy = new StreamNode(null, 1, "default 
group", null, dummyOperator, "target dummy", new 
LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
 
                for (int i = 0; i < numInputGates; i++) {
 

Reply via email to