This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fec2ccfdd259bfe4e7e1cda578daeec0e7b8a38c
Author: ifndef-SleePy <[email protected]>
AuthorDate: Fri Jun 14 21:59:25 2019 +0800

    [FLINK-12832][datastream] Make slot sharing configurable in 
StreamGraphGenerator
---
 .../flink/streaming/api/graph/StreamGraph.java     |  8 +-
 .../streaming/api/graph/StreamGraphGenerator.java  | 21 +++++-
 .../flink/streaming/api/graph/StreamNode.java      |  9 ++-
 .../api/graph/StreamGraphGeneratorTest.java        | 88 ++++++++++++++++++++++
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 56 ++++++++++++++
 5 files changed, 172 insertions(+), 10 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 942da2c..7381cde 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -184,7 +184,7 @@ public class StreamGraph extends StreamingPlan {
        }
 
        public <IN, OUT> void addSource(Integer vertexID,
-               String slotSharingGroup,
+               @Nullable String slotSharingGroup,
                @Nullable String coLocationGroup,
                StreamOperatorFactory<OUT> operatorFactory,
                TypeInformation<IN> inTypeInfo,
@@ -195,7 +195,7 @@ public class StreamGraph extends StreamingPlan {
        }
 
        public <IN, OUT> void addSink(Integer vertexID,
-               String slotSharingGroup,
+               @Nullable String slotSharingGroup,
                @Nullable String coLocationGroup,
                StreamOperatorFactory<OUT> operatorFactory,
                TypeInformation<IN> inTypeInfo,
@@ -207,7 +207,7 @@ public class StreamGraph extends StreamingPlan {
 
        public <IN, OUT> void addOperator(
                        Integer vertexID,
-                       String slotSharingGroup,
+                       @Nullable String slotSharingGroup,
                        @Nullable String coLocationGroup,
                        StreamOperatorFactory<OUT> operatorFactory,
                        TypeInformation<IN> inTypeInfo,
@@ -271,7 +271,7 @@ public class StreamGraph extends StreamingPlan {
        }
 
        protected StreamNode addNode(Integer vertexID,
-               String slotSharingGroup,
+               @Nullable String slotSharingGroup,
                @Nullable String coLocationGroup,
                Class<? extends AbstractInvokable> vertexClass,
                StreamOperatorFactory<?> operatorFactory,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index a227da3..b5ab9c5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -96,6 +96,8 @@ public class StreamGraphGenerator {
        /** The default buffer timeout (max delay of records in the network 
stack). */
        public static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;
 
+       public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
+
        private final List<StreamTransformation<?>> transformations;
 
        private final ExecutionConfig executionConfig;
@@ -106,6 +108,8 @@ public class StreamGraphGenerator {
 
        private boolean chaining = true;
 
+       private boolean isSlotSharingEnabled = true;
+
        private Collection<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> userArtifacts;
 
        private TimeCharacteristic timeCharacteristic = 
DEFAULT_TIME_CHARACTERISTIC;
@@ -143,6 +147,11 @@ public class StreamGraphGenerator {
                return this;
        }
 
+       public StreamGraphGenerator setSlotSharingEnabled(boolean 
isSlotSharingEnabled) {
+               this.isSlotSharingEnabled = isSlotSharingEnabled;
+               return this;
+       }
+
        public StreamGraphGenerator setUserArtifacts(Collection<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> userArtifacts) {
                this.userArtifacts = userArtifacts;
                return this;
@@ -457,6 +466,10 @@ public class StreamGraphGenerator {
                }
 
                String slotSharingGroup = determineSlotSharingGroup(null, 
allFeedbackIds);
+               // slot sharing group of iteration node must exist
+               if (slotSharingGroup == null) {
+                       slotSharingGroup = "SlotSharingGroup-" + 
iterate.getId();
+               }
 
                itSink.setSlotSharingGroup(slotSharingGroup);
                itSource.setSlotSharingGroup(slotSharingGroup);
@@ -706,6 +719,10 @@ public class StreamGraphGenerator {
         * @param inputIds The IDs of the input operations.
         */
        private String determineSlotSharingGroup(String specifiedGroup, 
Collection<Integer> inputIds) {
+               if (!isSlotSharingEnabled) {
+                       return null;
+               }
+
                if (specifiedGroup != null) {
                        return specifiedGroup;
                } else {
@@ -715,10 +732,10 @@ public class StreamGraphGenerator {
                                if (inputGroup == null) {
                                        inputGroup = inputGroupCandidate;
                                } else if 
(!inputGroup.equals(inputGroupCandidate)) {
-                                       return "default";
+                                       return DEFAULT_SLOT_SHARING_GROUP;
                                }
                        }
-                       return inputGroup == null ? "default" : inputGroup;
+                       return inputGroup == null ? DEFAULT_SLOT_SHARING_GROUP 
: inputGroup;
                }
        }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index e3cc498..d5c974f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -55,7 +55,7 @@ public class StreamNode implements Serializable {
        private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
        private long bufferTimeout;
        private final String operatorName;
-       private String slotSharingGroup;
+       private @Nullable String slotSharingGroup;
        private @Nullable String coLocationGroup;
        private KeySelector<?, ?> statePartitioner1;
        private KeySelector<?, ?> statePartitioner2;
@@ -81,7 +81,7 @@ public class StreamNode implements Serializable {
        @VisibleForTesting
        public StreamNode(
                        Integer id,
-                       String slotSharingGroup,
+                       @Nullable String slotSharingGroup,
                        @Nullable String coLocationGroup,
                        StreamOperator<?> operator,
                        String operatorName,
@@ -93,7 +93,7 @@ public class StreamNode implements Serializable {
 
        public StreamNode(
                Integer id,
-               String slotSharingGroup,
+               @Nullable String slotSharingGroup,
                @Nullable String coLocationGroup,
                StreamOperatorFactory<?> operatorFactory,
                String operatorName,
@@ -269,10 +269,11 @@ public class StreamNode implements Serializable {
                this.outputFormat = outputFormat;
        }
 
-       public void setSlotSharingGroup(String slotSharingGroup) {
+       public void setSlotSharingGroup(@Nullable String slotSharingGroup) {
                this.slotSharingGroup = slotSharingGroup;
        }
 
+       @Nullable
        public String getSlotSharingGroup() {
                return slotSharingGroup;
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 5e42a55..454c95c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
@@ -50,8 +51,13 @@ import org.apache.flink.streaming.util.NoOpIntMap;
 
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -442,11 +448,93 @@ public class StreamGraphGeneratorTest {
                        assertNotNull(iterationPair.f0.getCoLocationGroup());
                        assertEquals(iterationPair.f0.getCoLocationGroup(), 
iterationPair.f1.getCoLocationGroup());
 
+                       
assertEquals(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, 
iterationPair.f0.getSlotSharingGroup());
+                       assertEquals(iterationPair.f0.getSlotSharingGroup(), 
iterationPair.f1.getSlotSharingGroup());
+               }
+       }
+
+       /**
+        * Test iteration job when disable slot sharing, check slot sharing 
group and co-location group.
+        */
+       @Test
+       public void testIterationWithSlotSharingDisabled() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Integer> source = env.fromElements(1, 2, 
3).name("source");
+               IterativeStream<Integer> iteration = source.iterate(3000);
+               iteration.name("iteration").setParallelism(2);
+               DataStream<Integer> map = iteration.map(x -> x + 
1).name("map").setParallelism(2);
+               DataStream<Integer> filter = map.filter((x) -> 
false).name("filter").setParallelism(2);
+               iteration.closeWith(filter).print();
+
+               List<StreamTransformation<?>> transformations = new 
ArrayList<>();
+               transformations.add(source.getTransformation());
+               transformations.add(iteration.getTransformation());
+               transformations.add(map.getTransformation());
+               transformations.add(filter.getTransformation());
+
+               StreamGraphGenerator generator = new 
StreamGraphGenerator(transformations, env.getConfig(), 
env.getCheckpointConfig());
+               generator.setSlotSharingEnabled(false);
+               StreamGraph streamGraph = generator.generate();
+
+               for (Tuple2<StreamNode, StreamNode> iterationPair : 
streamGraph.getIterationSourceSinkPairs()) {
+                       assertNotNull(iterationPair.f0.getCoLocationGroup());
+                       assertEquals(iterationPair.f0.getCoLocationGroup(), 
iterationPair.f1.getCoLocationGroup());
+
                        assertNotNull(iterationPair.f0.getSlotSharingGroup());
                        assertEquals(iterationPair.f0.getSlotSharingGroup(), 
iterationPair.f1.getSlotSharingGroup());
                }
        }
 
+       /**
+        * Test slot sharing is enabled.
+        */
+       @Test
+       public void testEnableSlotSharing() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 
3);
+               DataStream<Integer> mapDataStream = sourceDataStream.map(x -> x 
+ 1);
+
+               final List<StreamTransformation<?>> transformations = new 
ArrayList<>();
+               transformations.add(sourceDataStream.getTransformation());
+               transformations.add(mapDataStream.getTransformation());
+
+               // all stream nodes share default group by default
+               StreamGraph streamGraph = new StreamGraphGenerator(
+                               transformations, env.getConfig(), 
env.getCheckpointConfig())
+                       .generate();
+
+               Collection<StreamNode> streamNodes = 
streamGraph.getStreamNodes();
+               for (StreamNode streamNode : streamNodes) {
+                       
assertEquals(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, 
streamNode.getSlotSharingGroup());
+               }
+       }
+
+       /**
+        * Test slot sharing is disabled.
+        */
+       @Test
+       public void testDisableSlotSharing() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 
3);
+               DataStream<Integer> mapDataStream = sourceDataStream.map(x -> x 
+ 1);
+
+               final List<StreamTransformation<?>> transformations = new 
ArrayList<>();
+               transformations.add(sourceDataStream.getTransformation());
+               transformations.add(mapDataStream.getTransformation());
+
+               // all stream nodes would have no group if slot sharing group 
is disabled
+               StreamGraph streamGraph = new StreamGraphGenerator(
+                               transformations, env.getConfig(), 
env.getCheckpointConfig())
+                       .setSlotSharingEnabled(false)
+                       .generate();
+
+               Collection<StreamNode> streamNodes = 
streamGraph.getStreamNodes();
+               for (StreamNode streamNode : streamNodes) {
+                       assertNull(streamNode.getSlotSharingGroup());
+               }
+       }
+
        private static class OutputTypeConfigurableOperationWithTwoInputs
                        extends AbstractStreamOperator<Integer>
                        implements TwoInputStreamOperator<Integer, Integer, 
Integer>, OutputTypeConfigurable<Integer> {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 4e3e49c..f8bed23 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -49,6 +49,7 @@ import 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -56,6 +57,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -446,4 +448,58 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                assertNotNull(iterationSinkCoLocationGroup);
                assertEquals(iterationSourceCoLocationGroup, 
iterationSinkCoLocationGroup);
        }
+
+       /**
+        * Test slot sharing group is enabled or disabled for iteration.
+        */
+       @Test
+       public void testDisableSlotSharingForIteration() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Integer> source = env.fromElements(1, 2, 
3).name("source");
+               IterativeStream<Integer> iteration = source.iterate(3000);
+               iteration.name("iteration").setParallelism(2);
+               DataStream<Integer> map = iteration.map(x -> x + 
1).name("map").setParallelism(2);
+               DataStream<Integer> filter = map.filter((x) -> 
false).name("filter").setParallelism(2);
+               iteration.closeWith(filter).print();
+
+               List<StreamTransformation<?>> transformations = new 
ArrayList<>();
+               transformations.add(source.getTransformation());
+               transformations.add(iteration.getTransformation());
+               transformations.add(map.getTransformation());
+               transformations.add(filter.getTransformation());
+               // when slot sharing group is disabled
+               // all job vertices except iteration vertex would have no slot 
sharing group
+               // iteration vertices would be set slot sharing group 
automatically
+               StreamGraphGenerator generator = new 
StreamGraphGenerator(transformations, env.getConfig(), 
env.getCheckpointConfig());
+               generator.setSlotSharingEnabled(false);
+
+               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(generator.generate());
+
+               SlotSharingGroup iterationSourceSlotSharingGroup = null;
+               SlotSharingGroup iterationSinkSlotSharingGroup = null;
+
+               CoLocationGroup iterationSourceCoLocationGroup = null;
+               CoLocationGroup iterationSinkCoLocationGroup = null;
+
+               for (JobVertex jobVertex : jobGraph.getVertices()) {
+                       if 
(jobVertex.getName().startsWith(StreamGraph.ITERATION_SOURCE_NAME_PREFIX)) {
+                               iterationSourceSlotSharingGroup = 
jobVertex.getSlotSharingGroup();
+                               iterationSourceCoLocationGroup = 
jobVertex.getCoLocationGroup();
+                       } else if 
(jobVertex.getName().startsWith(StreamGraph.ITERATION_SINK_NAME_PREFIX)) {
+                               iterationSinkSlotSharingGroup = 
jobVertex.getSlotSharingGroup();
+                               iterationSinkCoLocationGroup = 
jobVertex.getCoLocationGroup();
+                       } else {
+                               assertNull(jobVertex.getSlotSharingGroup());
+                       }
+               }
+
+               assertNotNull(iterationSourceSlotSharingGroup);
+               assertNotNull(iterationSinkSlotSharingGroup);
+               assertEquals(iterationSourceSlotSharingGroup, 
iterationSinkSlotSharingGroup);
+
+               assertNotNull(iterationSourceCoLocationGroup);
+               assertNotNull(iterationSinkCoLocationGroup);
+               assertEquals(iterationSourceCoLocationGroup, 
iterationSinkCoLocationGroup);
+       }
 }

Reply via email to