This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fec2ccfdd259bfe4e7e1cda578daeec0e7b8a38c Author: ifndef-SleePy <[email protected]> AuthorDate: Fri Jun 14 21:59:25 2019 +0800 [FLINK-12832][datastream] Make slot sharing configurable in StreamGraphGenerator --- .../flink/streaming/api/graph/StreamGraph.java | 8 +- .../streaming/api/graph/StreamGraphGenerator.java | 21 +++++- .../flink/streaming/api/graph/StreamNode.java | 9 ++- .../api/graph/StreamGraphGeneratorTest.java | 88 ++++++++++++++++++++++ .../api/graph/StreamingJobGraphGeneratorTest.java | 56 ++++++++++++++ 5 files changed, 172 insertions(+), 10 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 942da2c..7381cde 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -184,7 +184,7 @@ public class StreamGraph extends StreamingPlan { } public <IN, OUT> void addSource(Integer vertexID, - String slotSharingGroup, + @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> operatorFactory, TypeInformation<IN> inTypeInfo, @@ -195,7 +195,7 @@ public class StreamGraph extends StreamingPlan { } public <IN, OUT> void addSink(Integer vertexID, - String slotSharingGroup, + @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> operatorFactory, TypeInformation<IN> inTypeInfo, @@ -207,7 +207,7 @@ public class StreamGraph extends StreamingPlan { public <IN, OUT> void addOperator( Integer vertexID, - String slotSharingGroup, + @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> operatorFactory, TypeInformation<IN> inTypeInfo, @@ -271,7 +271,7 @@ public class StreamGraph extends StreamingPlan { } protected StreamNode addNode(Integer vertexID, - String slotSharingGroup, + @Nullable String slotSharingGroup, @Nullable String coLocationGroup, Class<? extends AbstractInvokable> vertexClass, StreamOperatorFactory<?> operatorFactory, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index a227da3..b5ab9c5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -96,6 +96,8 @@ public class StreamGraphGenerator { /** The default buffer timeout (max delay of records in the network stack). */ public static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L; + public static final String DEFAULT_SLOT_SHARING_GROUP = "default"; + private final List<StreamTransformation<?>> transformations; private final ExecutionConfig executionConfig; @@ -106,6 +108,8 @@ public class StreamGraphGenerator { private boolean chaining = true; + private boolean isSlotSharingEnabled = true; + private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts; private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC; @@ -143,6 +147,11 @@ public class StreamGraphGenerator { return this; } + public StreamGraphGenerator setSlotSharingEnabled(boolean isSlotSharingEnabled) { + this.isSlotSharingEnabled = isSlotSharingEnabled; + return this; + } + public StreamGraphGenerator setUserArtifacts(Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts) { this.userArtifacts = userArtifacts; return this; @@ -457,6 +466,10 @@ public class StreamGraphGenerator { } String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds); + // slot sharing group of iteration node must exist + if (slotSharingGroup == null) { + slotSharingGroup = "SlotSharingGroup-" + iterate.getId(); + } itSink.setSlotSharingGroup(slotSharingGroup); itSource.setSlotSharingGroup(slotSharingGroup); @@ -706,6 +719,10 @@ public class StreamGraphGenerator { * @param inputIds The IDs of the input operations. */ private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) { + if (!isSlotSharingEnabled) { + return null; + } + if (specifiedGroup != null) { return specifiedGroup; } else { @@ -715,10 +732,10 @@ public class StreamGraphGenerator { if (inputGroup == null) { inputGroup = inputGroupCandidate; } else if (!inputGroup.equals(inputGroupCandidate)) { - return "default"; + return DEFAULT_SLOT_SHARING_GROUP; } } - return inputGroup == null ? "default" : inputGroup; + return inputGroup == null ? DEFAULT_SLOT_SHARING_GROUP : inputGroup; } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index e3cc498..d5c974f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -55,7 +55,7 @@ public class StreamNode implements Serializable { private ResourceSpec preferredResources = ResourceSpec.DEFAULT; private long bufferTimeout; private final String operatorName; - private String slotSharingGroup; + private @Nullable String slotSharingGroup; private @Nullable String coLocationGroup; private KeySelector<?, ?> statePartitioner1; private KeySelector<?, ?> statePartitioner2; @@ -81,7 +81,7 @@ public class StreamNode implements Serializable { @VisibleForTesting public StreamNode( Integer id, - String slotSharingGroup, + @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperator<?> operator, String operatorName, @@ -93,7 +93,7 @@ public class StreamNode implements Serializable { public StreamNode( Integer id, - String slotSharingGroup, + @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<?> operatorFactory, String operatorName, @@ -269,10 +269,11 @@ public class StreamNode implements Serializable { this.outputFormat = outputFormat; } - public void setSlotSharingGroup(String slotSharingGroup) { + public void setSlotSharingGroup(@Nullable String slotSharingGroup) { this.slotSharingGroup = slotSharingGroup; } + @Nullable public String getSlotSharingGroup() { return slotSharingGroup; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 5e42a55..454c95c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner; @@ -50,8 +51,13 @@ import org.apache.flink.streaming.util.NoOpIntMap; import org.junit.Test; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; /** @@ -442,11 +448,93 @@ public class StreamGraphGeneratorTest { assertNotNull(iterationPair.f0.getCoLocationGroup()); assertEquals(iterationPair.f0.getCoLocationGroup(), iterationPair.f1.getCoLocationGroup()); + assertEquals(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, iterationPair.f0.getSlotSharingGroup()); + assertEquals(iterationPair.f0.getSlotSharingGroup(), iterationPair.f1.getSlotSharingGroup()); + } + } + + /** + * Test iteration job when disable slot sharing, check slot sharing group and co-location group. + */ + @Test + public void testIterationWithSlotSharingDisabled() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Integer> source = env.fromElements(1, 2, 3).name("source"); + IterativeStream<Integer> iteration = source.iterate(3000); + iteration.name("iteration").setParallelism(2); + DataStream<Integer> map = iteration.map(x -> x + 1).name("map").setParallelism(2); + DataStream<Integer> filter = map.filter((x) -> false).name("filter").setParallelism(2); + iteration.closeWith(filter).print(); + + List<StreamTransformation<?>> transformations = new ArrayList<>(); + transformations.add(source.getTransformation()); + transformations.add(iteration.getTransformation()); + transformations.add(map.getTransformation()); + transformations.add(filter.getTransformation()); + + StreamGraphGenerator generator = new StreamGraphGenerator(transformations, env.getConfig(), env.getCheckpointConfig()); + generator.setSlotSharingEnabled(false); + StreamGraph streamGraph = generator.generate(); + + for (Tuple2<StreamNode, StreamNode> iterationPair : streamGraph.getIterationSourceSinkPairs()) { + assertNotNull(iterationPair.f0.getCoLocationGroup()); + assertEquals(iterationPair.f0.getCoLocationGroup(), iterationPair.f1.getCoLocationGroup()); + assertNotNull(iterationPair.f0.getSlotSharingGroup()); assertEquals(iterationPair.f0.getSlotSharingGroup(), iterationPair.f1.getSlotSharingGroup()); } } + /** + * Test slot sharing is enabled. + */ + @Test + public void testEnableSlotSharing() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3); + DataStream<Integer> mapDataStream = sourceDataStream.map(x -> x + 1); + + final List<StreamTransformation<?>> transformations = new ArrayList<>(); + transformations.add(sourceDataStream.getTransformation()); + transformations.add(mapDataStream.getTransformation()); + + // all stream nodes share default group by default + StreamGraph streamGraph = new StreamGraphGenerator( + transformations, env.getConfig(), env.getCheckpointConfig()) + .generate(); + + Collection<StreamNode> streamNodes = streamGraph.getStreamNodes(); + for (StreamNode streamNode : streamNodes) { + assertEquals(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, streamNode.getSlotSharingGroup()); + } + } + + /** + * Test slot sharing is disabled. + */ + @Test + public void testDisableSlotSharing() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3); + DataStream<Integer> mapDataStream = sourceDataStream.map(x -> x + 1); + + final List<StreamTransformation<?>> transformations = new ArrayList<>(); + transformations.add(sourceDataStream.getTransformation()); + transformations.add(mapDataStream.getTransformation()); + + // all stream nodes would have no group if slot sharing group is disabled + StreamGraph streamGraph = new StreamGraphGenerator( + transformations, env.getConfig(), env.getCheckpointConfig()) + .setSlotSharingEnabled(false) + .generate(); + + Collection<StreamNode> streamNodes = streamGraph.getStreamNodes(); + for (StreamNode streamNode : streamNodes) { + assertNull(streamNode.getSlotSharingGroup()); + } + } + private static class OutputTypeConfigurableOperationWithTwoInputs extends AbstractStreamOperator<Integer> implements TwoInputStreamOperator<Integer, Integer, Integer>, OutputTypeConfigurable<Integer> { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 4e3e49c..f8bed23 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -49,6 +49,7 @@ import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -56,6 +57,7 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -446,4 +448,58 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { assertNotNull(iterationSinkCoLocationGroup); assertEquals(iterationSourceCoLocationGroup, iterationSinkCoLocationGroup); } + + /** + * Test slot sharing group is enabled or disabled for iteration. + */ + @Test + public void testDisableSlotSharingForIteration() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Integer> source = env.fromElements(1, 2, 3).name("source"); + IterativeStream<Integer> iteration = source.iterate(3000); + iteration.name("iteration").setParallelism(2); + DataStream<Integer> map = iteration.map(x -> x + 1).name("map").setParallelism(2); + DataStream<Integer> filter = map.filter((x) -> false).name("filter").setParallelism(2); + iteration.closeWith(filter).print(); + + List<StreamTransformation<?>> transformations = new ArrayList<>(); + transformations.add(source.getTransformation()); + transformations.add(iteration.getTransformation()); + transformations.add(map.getTransformation()); + transformations.add(filter.getTransformation()); + // when slot sharing group is disabled + // all job vertices except iteration vertex would have no slot sharing group + // iteration vertices would be set slot sharing group automatically + StreamGraphGenerator generator = new StreamGraphGenerator(transformations, env.getConfig(), env.getCheckpointConfig()); + generator.setSlotSharingEnabled(false); + + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(generator.generate()); + + SlotSharingGroup iterationSourceSlotSharingGroup = null; + SlotSharingGroup iterationSinkSlotSharingGroup = null; + + CoLocationGroup iterationSourceCoLocationGroup = null; + CoLocationGroup iterationSinkCoLocationGroup = null; + + for (JobVertex jobVertex : jobGraph.getVertices()) { + if (jobVertex.getName().startsWith(StreamGraph.ITERATION_SOURCE_NAME_PREFIX)) { + iterationSourceSlotSharingGroup = jobVertex.getSlotSharingGroup(); + iterationSourceCoLocationGroup = jobVertex.getCoLocationGroup(); + } else if (jobVertex.getName().startsWith(StreamGraph.ITERATION_SINK_NAME_PREFIX)) { + iterationSinkSlotSharingGroup = jobVertex.getSlotSharingGroup(); + iterationSinkCoLocationGroup = jobVertex.getCoLocationGroup(); + } else { + assertNull(jobVertex.getSlotSharingGroup()); + } + } + + assertNotNull(iterationSourceSlotSharingGroup); + assertNotNull(iterationSinkSlotSharingGroup); + assertEquals(iterationSourceSlotSharingGroup, iterationSinkSlotSharingGroup); + + assertNotNull(iterationSourceCoLocationGroup); + assertNotNull(iterationSinkCoLocationGroup); + assertEquals(iterationSourceCoLocationGroup, iterationSinkCoLocationGroup); + } }
