Repository: flink Updated Branches: refs/heads/master 3c4e59a7f -> 3f0b9fee5
[FLINK-9809] [DataSteam API] Allow setting co-location constraints on StreamTransformations. This feature is currently only exposed on StreamTransformations (internal API) rather than in the public API, because it is a hidden expert feature. This closes #6309 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f0b9fee Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f0b9fee Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f0b9fee Branch: refs/heads/master Commit: 3f0b9fee5185fcc7200179459bed31fb4bd08bbf Parents: 3c4e59a Author: Stephan Ewen <[email protected]> Authored: Wed Jul 11 17:48:10 2018 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu Jul 12 09:38:43 2018 +0200 ---------------------------------------------------------------------- .../flink/streaming/api/graph/StreamGraph.java | 22 ++-- .../api/graph/StreamGraphGenerator.java | 7 +- .../flink/streaming/api/graph/StreamNode.java | 14 +++ .../api/graph/StreamingJobGraphGenerator.java | 43 ++++++-- .../transformations/StreamTransformation.java | 34 +++++++ .../StreamGraphCoLocationConstraintTest.java | 102 +++++++++++++++++++ .../runtime/tasks/OneInputStreamTaskTest.java | 2 + .../runtime/tasks/StreamConfigChainer.java | 8 +- .../runtime/tasks/StreamTaskTestHarness.java | 4 +- .../tasks/TwoInputStreamTaskTestHarness.java | 4 +- 10 files changed, 215 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index e5ed0c2..01768ad 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -53,6 +53,8 @@ import org.apache.flink.util.OutputTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -163,38 +165,41 @@ public class StreamGraph extends StreamingPlan { public <IN, OUT> void addSource(Integer vertexID, String slotSharingGroup, + @Nullable String coLocationGroup, StreamOperator<OUT> operatorObject, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) { - addOperator(vertexID, slotSharingGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName); + addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName); sources.add(vertexID); } public <IN, OUT> void addSink(Integer vertexID, String slotSharingGroup, + @Nullable String coLocationGroup, StreamOperator<OUT> operatorObject, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) { - addOperator(vertexID, slotSharingGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName); + addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName); sinks.add(vertexID); } public <IN, OUT> void addOperator( Integer vertexID, String slotSharingGroup, + @Nullable String coLocationGroup, StreamOperator<OUT> operatorObject, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) { if (operatorObject instanceof StoppableStreamSource) { - addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName); + addNode(vertexID, slotSharingGroup, coLocationGroup, StoppableSourceStreamTask.class, operatorObject, operatorName); } else if (operatorObject instanceof StreamSource) { - addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName); + addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorObject, operatorName); } else { - addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName); + addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorObject, operatorName); } TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null; @@ -223,13 +228,14 @@ public class StreamGraph extends StreamingPlan { public <IN1, IN2, OUT> void addCoOperator( Integer vertexID, String slotSharingGroup, + @Nullable String coLocationGroup, TwoInputStreamOperator<IN1, IN2, OUT> taskOperatorObject, TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) { - addNode(vertexID, slotSharingGroup, TwoInputStreamTask.class, taskOperatorObject, operatorName); + addNode(vertexID, slotSharingGroup, coLocationGroup, TwoInputStreamTask.class, taskOperatorObject, operatorName); TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null; @@ -250,6 +256,7 @@ public class StreamGraph extends StreamingPlan { protected StreamNode addNode(Integer vertexID, String slotSharingGroup, + @Nullable String coLocationGroup, Class<? extends AbstractInvokable> vertexClass, StreamOperator<?> operatorObject, String operatorName) { @@ -261,6 +268,7 @@ public class StreamGraph extends StreamingPlan { StreamNode vertex = new StreamNode(environment, vertexID, slotSharingGroup, + coLocationGroup, operatorObject, operatorName, new ArrayList<OutputSelector<?>>(), @@ -593,6 +601,7 @@ public class StreamGraph extends StreamingPlan { ResourceSpec preferredResources) { StreamNode source = this.addNode(sourceId, null, + null, StreamIterationHead.class, null, "IterationSource-" + loopId); @@ -603,6 +612,7 @@ public class StreamGraph extends StreamingPlan { StreamNode sink = this.addNode(sinkId, null, + null, StreamIterationTail.class, null, "IterationSink-" + loopId); http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 11a7002..2c4ae4a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -466,9 +466,11 @@ public class StreamGraphGenerator { * Transforms a {@code SourceTransformation}. */ private <T> Collection<Integer> transformSource(SourceTransformation<T> source) { - String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), new ArrayList<Integer>()); + String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList()); + streamGraph.addSource(source.getId(), slotSharingGroup, + source.getCoLocationGroupKey(), source.getOperator(), null, source.getOutputType(), @@ -493,6 +495,7 @@ public class StreamGraphGenerator { streamGraph.addSink(sink.getId(), slotSharingGroup, + sink.getCoLocationGroupKey(), sink.getOperator(), sink.getInput().getOutputType(), null, @@ -535,6 +538,7 @@ public class StreamGraphGenerator { streamGraph.addOperator(transform.getId(), slotSharingGroup, + transform.getCoLocationGroupKey(), transform.getOperator(), transform.getInputType(), transform.getOutputType(), @@ -580,6 +584,7 @@ public class StreamGraphGenerator { streamGraph.addCoOperator( transform.getId(), slotSharingGroup, + transform.getCoLocationGroupKey(), transform.getOperator(), transform.getInputType1(), transform.getInputType2(), http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 78ab877..fe12662 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -28,6 +28,8 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.StreamOperator; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -54,6 +56,7 @@ public class StreamNode implements Serializable { private Long bufferTimeout = null; private final String operatorName; private String slotSharingGroup; + private @Nullable String coLocationGroup; private KeySelector<?, ?> statePartitioner1; private KeySelector<?, ?> statePartitioner2; private TypeSerializer<?> stateKeySerializer; @@ -77,10 +80,12 @@ public class StreamNode implements Serializable { public StreamNode(StreamExecutionEnvironment env, Integer id, String slotSharingGroup, + @Nullable String coLocationGroup, StreamOperator<?> operator, String operatorName, List<OutputSelector<?>> outputSelector, Class<? extends AbstractInvokable> jobVertexClass) { + this.env = env; this.id = id; this.operatorName = operatorName; @@ -88,6 +93,7 @@ public class StreamNode implements Serializable { this.outputSelectors = outputSelector; this.jobVertexClass = jobVertexClass; this.slotSharingGroup = slotSharingGroup; + this.coLocationGroup = coLocationGroup; } public void addInEdge(StreamEdge inEdge) { @@ -253,6 +259,14 @@ public class StreamNode implements Serializable { return slotSharingGroup; } + public void setCoLocationGroup(@Nullable String coLocationGroup) { + this.coLocationGroup = coLocationGroup; + } + + public @Nullable String getCoLocationGroup() { + return coLocationGroup; + } + public boolean isSameSlotSharingGroup(StreamNode downstreamVertex) { return (slotSharingGroup == null && downstreamVertex.slotSharingGroup == null) || (slotSharingGroup != null && slotSharingGroup.equals(downstreamVertex.slotSharingGroup)); http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 5b8254d..603b9e4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -152,7 +152,7 @@ public class StreamingJobGraphGenerator { setPhysicalEdges(); - setSlotSharing(); + setSlotSharingAndCoLocation(); configureCheckpointing(); @@ -531,20 +531,43 @@ public class StreamingJobGraphGenerator { && streamGraph.isChainingEnabled(); } - private void setSlotSharing() { - - Map<String, SlotSharingGroup> slotSharingGroups = new HashMap<>(); + private void setSlotSharingAndCoLocation() { + final HashMap<String, SlotSharingGroup> slotSharingGroups = new HashMap<>(); + final HashMap<String, Tuple2<SlotSharingGroup, CoLocationGroup>> coLocationGroups = new HashMap<>(); for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) { - String slotSharingGroup = streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup(); + final StreamNode node = streamGraph.getStreamNode(entry.getKey()); + final JobVertex vertex = entry.getValue(); + + // configure slot sharing group + final String slotSharingGroupKey = node.getSlotSharingGroup(); + final SlotSharingGroup sharingGroup; + + if (slotSharingGroupKey != null) { + sharingGroup = slotSharingGroups.computeIfAbsent( + slotSharingGroupKey, (k) -> new SlotSharingGroup()); + vertex.setSlotSharingGroup(sharingGroup); + } else { + sharingGroup = null; + } + + // configure co-location constraint + final String coLocationGroupKey = node.getCoLocationGroup(); + if (coLocationGroupKey != null) { + if (sharingGroup == null) { + throw new IllegalStateException("Cannot use a co-location constraint without a slot sharing group"); + } + + Tuple2<SlotSharingGroup, CoLocationGroup> constraint = coLocationGroups.computeIfAbsent( + coLocationGroupKey, (k) -> new Tuple2<>(sharingGroup, new CoLocationGroup())); + + if (constraint.f0 != sharingGroup) { + throw new IllegalStateException("Cannot co-locate operators from different slot sharing groups"); + } - SlotSharingGroup group = slotSharingGroups.get(slotSharingGroup); - if (group == null) { - group = new SlotSharingGroup(); - slotSharingGroups.put(slotSharingGroup, group); + vertex.updateCoLocationGroup(constraint.f1); } - entry.getValue().setSlotSharingGroup(group); } for (Tuple2<StreamNode, StreamNode> pair : streamGraph.getIterationSourceSinkPairs()) { http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java index 1f763bb..bfbe78d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java @@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.util.Collection; import static org.apache.flink.util.Preconditions.checkArgument; @@ -151,6 +153,9 @@ public abstract class StreamTransformation<T> { private String slotSharingGroup; + @Nullable + private String coLocationGroupKey; + /** * Creates a new {@code StreamTransformation} with the given name, output type and parallelism. * @@ -345,6 +350,35 @@ public abstract class StreamTransformation<T> { } /** + * <b>NOTE:</b> This is an internal undocumented feature for now. It is not + * clear whether this will be supported and stable in the long term. + * + * <p>Sets the key that identifies the co-location group. + * Operators with the same co-location key will have their corresponding subtasks + * placed into the same slot by the scheduler. + * + * <p>Setting this to null means there is no co-location constraint. + */ + public void setCoLocationGroupKey(@Nullable String coLocationGroupKey) { + this.coLocationGroupKey = coLocationGroupKey; + } + + /** + * <b>NOTE:</b> This is an internal undocumented feature for now. It is not + * clear whether this will be supported and stable in the long term. + * + * <p>Gets the key that identifies the co-location group. + * Operators with the same co-location key will have their corresponding subtasks + * placed into the same slot by the scheduler. + * + * <p>If this is null (which is the default), it means there is no co-location constraint. + */ + @Nullable + public String getCoLocationGroupKey() { + return coLocationGroupKey; + } + + /** * Tries to fill in the type information. Type information can be filled in * later when the program uses a type hint. This method checks whether the * type information has ever been accessed before and does not allow http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java new file mode 100644 index 0000000..a225681 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.graph; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; + +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +/** + * Test that check the hidden API to set co location constraints on the + * stream transformations. + */ +public class StreamGraphCoLocationConstraintTest { + + @Test + public void testSettingCoLocationConstraint() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(7); + + // set up the test program + DataStream<Long> source = env.generateSequence(1L, 10_000_000); + source.getTransformation().setCoLocationGroupKey("group1"); + + DataStream<Long> step1 = source.keyBy(v -> v).map(v -> v); + step1.getTransformation().setCoLocationGroupKey("group2"); + + DataStream<Long> step2 = step1.keyBy(v -> v).map(v -> v); + step2.getTransformation().setCoLocationGroupKey("group1"); + + DataStreamSink<Long> result = step2.keyBy(v -> v).addSink(new DiscardingSink<>()); + result.getTransformation().setCoLocationGroupKey("group2"); + + // get the graph + final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + assertEquals(4, jobGraph.getNumberOfVertices()); + + List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources(); + for (JobVertex vertex : vertices) { + assertNotNull(vertex.getCoLocationGroup()); + } + + assertEquals(vertices.get(0).getCoLocationGroup(), vertices.get(2).getCoLocationGroup()); + assertEquals(vertices.get(1).getCoLocationGroup(), vertices.get(3).getCoLocationGroup()); + } + + @Test + public void testCoLocateDifferenSharingGroups() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(7); + + // set up the test program + DataStream<Long> source = env.generateSequence(1L, 10_000_000); + source.getTransformation().setSlotSharingGroup("ssg1"); + source.getTransformation().setCoLocationGroupKey("co1"); + + DataStream<Long> step1 = source.keyBy(v -> v).map(v -> v); + step1.getTransformation().setSlotSharingGroup("ssg2"); + step1.getTransformation().setCoLocationGroupKey("co2"); + + DataStream<Long> step2 = step1.keyBy(v -> v).map(v -> v); + step2.getTransformation().setSlotSharingGroup("ssg3"); + step2.getTransformation().setCoLocationGroupKey("co1"); + + DataStreamSink<Long> result = step2.keyBy(v -> v).addSink(new DiscardingSink<>()); + result.getTransformation().setSlotSharingGroup("ssg4"); + result.getTransformation().setCoLocationGroupKey("co2"); + + // get the graph + try { + env.getStreamGraph().getJobGraph(); + fail("exception expected"); + } + catch (IllegalStateException ignored) {} + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 201e138..96eaa78 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -793,6 +793,7 @@ public class OneInputStreamTaskTest extends TestLogger { null, null, null, + null, null ), new StreamNode( @@ -802,6 +803,7 @@ public class OneInputStreamTaskTest extends TestLogger { null, null, null, + null, null ), 0, http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java index 74898a4..10e50ce 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java @@ -76,8 +76,8 @@ public class StreamConfigChainer { tailConfig.setChainedOutputs(Collections.singletonList( new StreamEdge( - new StreamNode(null, tailConfig.getChainIndex(), null, null, null, null, null), - new StreamNode(null, chainIndex, null, null, null, null, null), + new StreamNode(null, tailConfig.getChainIndex(), null, null, null, null, null, null), + new StreamNode(null, chainIndex, null, null, null, null, null, null), 0, Collections.<String>emptyList(), null, @@ -99,8 +99,8 @@ public class StreamConfigChainer { List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>(); outEdgesInOrder.add( new StreamEdge( - new StreamNode(null, chainIndex, null, null, null, null, null), - new StreamNode(null, chainIndex , null, null, null, null, null), + new StreamNode(null, chainIndex, null, null, null, null, null, null), + new StreamNode(null, chainIndex , null, null, null, null, null, null), 0, Collections.<String>emptyList(), new BroadcastPartitioner<Object>(), http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 08032bd..b2f1b99 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -176,8 +176,8 @@ public class StreamTaskTestHarness<OUT> { }; List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>(); - StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); - StreamNode targetVertexDummy = new StreamNode(null, 1, "group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); + StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); + StreamNode targetVertexDummy = new StreamNode(null, 1, "group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */)); http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java index 78e6de2..4c1c424 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java @@ -116,8 +116,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest private static final long serialVersionUID = 1L; }; - StreamNode sourceVertexDummy = new StreamNode(null, 0, "default group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); - StreamNode targetVertexDummy = new StreamNode(null, 1, "default group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); + StreamNode sourceVertexDummy = new StreamNode(null, 0, "default group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); + StreamNode targetVertexDummy = new StreamNode(null, 1, "default group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); for (int i = 0; i < numInputGates; i++) {
