This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2e86133703bbc9f7876a0534c584be11ef120fd8
Author: Yangze Guo <[email protected]>
AuthorDate: Fri Jun 25 14:53:24 2021 +0800

    [FLINK-21925][core] Introduce #slotSharingGroup(SlotSharingGroup) for 
configuring slot sharing group with its resource
---
 .../org/apache/flink/api/dag/Transformation.java   | 31 +++++++++---
 .../apache/flink/python/util/PythonConfigUtil.java |  4 +-
 .../streaming/api/datastream/DataStreamSink.java   | 19 ++++++++
 .../api/datastream/SingleOutputStreamOperator.java | 19 ++++++++
 .../streaming/api/graph/StreamGraphGenerator.java  | 46 +++++++++++++++--
 .../api/graph/StreamGraphGeneratorTest.java        | 57 ++++++++++++++++++++++
 6 files changed, 163 insertions(+), 13 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java 
b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
index 0334b07..915bafb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.dag;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
@@ -167,7 +168,7 @@ public abstract class Transformation<T> {
 
     protected long bufferTimeout = -1;
 
-    private String slotSharingGroup;
+    private Optional<SlotSharingGroup> slotSharingGroup;
 
     @Nullable private String coLocationGroupKey;
 
@@ -184,7 +185,7 @@ public abstract class Transformation<T> {
         this.name = Preconditions.checkNotNull(name);
         this.outputType = outputType;
         this.parallelism = parallelism;
-        this.slotSharingGroup = null;
+        this.slotSharingGroup = Optional.empty();
     }
 
     /** Returns the unique ID of this {@code Transformation}. */
@@ -390,11 +391,11 @@ public abstract class Transformation<T> {
     }
 
     /**
-     * Returns the slot sharing group of this transformation.
+     * Returns the slot sharing group of this transformation if present.
      *
-     * @see #setSlotSharingGroup(String)
+     * @see #setSlotSharingGroup(SlotSharingGroup)
      */
-    public String getSlotSharingGroup() {
+    public Optional<SlotSharingGroup> getSlotSharingGroup() {
         return slotSharingGroup;
     }
 
@@ -405,10 +406,24 @@ public abstract class Transformation<T> {
      * <p>Initially, an operation is in the default slot sharing group. This 
can be explicitly set
      * using {@code setSlotSharingGroup("default")}.
      *
-     * @param slotSharingGroup The slot sharing group name.
+     * @param slotSharingGroupName The slot sharing group's name.
      */
-    public void setSlotSharingGroup(String slotSharingGroup) {
-        this.slotSharingGroup = slotSharingGroup;
+    public void setSlotSharingGroup(String slotSharingGroupName) {
+        this.slotSharingGroup =
+                
Optional.of(SlotSharingGroup.newBuilder(slotSharingGroupName).build());
+    }
+
+    /**
+     * Sets the slot sharing group of this transformation. Parallel instances 
of operations that are
+     * in the same slot sharing group will be co-located in the same 
TaskManager slot, if possible.
+     *
+     * <p>Initially, an operation is in the default slot sharing group. This 
can be explicitly set
+     * with constructing a {@link SlotSharingGroup} with name {@code 
"default"}.
+     *
+     * @param slotSharingGroup which contains name and its resource spec.
+     */
+    public void setSlotSharingGroup(SlotSharingGroup slotSharingGroup) {
+        this.slotSharingGroup = Optional.of(slotSharingGroup);
     }
 
     /**
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
index 8181266..cdba06e 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
@@ -215,7 +215,9 @@ public class PythonConfigUtil {
 
     private static void chainTransformation(
             Transformation<?> firstTransformation, Transformation<?> 
secondTransformation) {
-        
firstTransformation.setSlotSharingGroup(secondTransformation.getSlotSharingGroup());
+        secondTransformation
+                .getSlotSharingGroup()
+                .ifPresent(firstTransformation::setSlotSharingGroup);
         
firstTransformation.setCoLocationGroupKey(secondTransformation.getCoLocationGroupKey());
         
firstTransformation.setParallelism(secondTransformation.getParallelism());
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index a3da7c6..eea988e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -205,4 +206,22 @@ public class DataStreamSink<T> {
         transformation.setSlotSharingGroup(slotSharingGroup);
         return this;
     }
+
+    /**
+     * Sets the slot sharing group of this operation. Parallel instances of 
operations that are in
+     * the same slot sharing group will be co-located in the same TaskManager 
slot, if possible.
+     *
+     * <p>Operations inherit the slot sharing group of input operations if all 
input operations are
+     * in the same slot sharing group and no slot sharing group was explicitly 
specified.
+     *
+     * <p>Initially an operation is in the default slot sharing group. An 
operation can be put into
+     * the default group explicitly by setting the slot sharing group with 
name {@code "default"}.
+     *
+     * @param slotSharingGroup which contains name and its resource spec.
+     */
+    @PublicEvolving
+    public DataStreamSink<T> slotSharingGroup(SlotSharingGroup 
slotSharingGroup) {
+        transformation.setSlotSharingGroup(slotSharingGroup);
+        return this;
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 6b77c24..7b64d21 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -374,6 +375,24 @@ public class SingleOutputStreamOperator<T> extends 
DataStream<T> {
     }
 
     /**
+     * Sets the slot sharing group of this operation. Parallel instances of 
operations that are in
+     * the same slot sharing group will be co-located in the same TaskManager 
slot, if possible.
+     *
+     * <p>Operations inherit the slot sharing group of input operations if all 
input operations are
+     * in the same slot sharing group and no slot sharing group was explicitly 
specified.
+     *
+     * <p>Initially an operation is in the default slot sharing group. An 
operation can be put into
+     * the default group explicitly by setting the slot sharing group with 
name {@code "default"}.
+     *
+     * @param slotSharingGroup which contains name and its resource spec.
+     */
+    @PublicEvolving
+    public SingleOutputStreamOperator<T> slotSharingGroup(SlotSharingGroup 
slotSharingGroup) {
+        transformation.setSlotSharingGroup(slotSharingGroup);
+        return this;
+    }
+
+    /**
      * Gets the {@link DataStream} that contains the elements that are emitted 
from an operation
      * into the side output with the given {@link OutputTag}.
      *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 5a1d26c..d39fd1f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -22,11 +22,14 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.operators.util.SlotSharingGroupUtils;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -141,7 +144,7 @@ public class StreamGraphGenerator {
 
     private final ReadableConfig configuration;
 
-    // Records the slot sharing groups and their corresponding ResourceProfile
+    // Records the slot sharing groups and their corresponding fine-grained 
ResourceProfile
     private final Map<String, ResourceProfile> slotSharingGroupResources = new 
HashMap<>();
 
     private Path savepointDir;
@@ -293,7 +296,12 @@ public class StreamGraphGenerator {
      */
     public StreamGraphGenerator setSlotSharingGroupResource(
             Map<String, ResourceProfile> slotSharingGroupResources) {
-        this.slotSharingGroupResources.putAll(slotSharingGroupResources);
+        slotSharingGroupResources.forEach(
+                (name, profile) -> {
+                    if (!profile.equals(ResourceProfile.UNKNOWN)) {
+                        this.slotSharingGroupResources.put(name, profile);
+                    }
+                });
         return this;
     }
 
@@ -312,6 +320,8 @@ public class StreamGraphGenerator {
             transform(transformation);
         }
 
+        streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);
+
         for (StreamNode node : streamGraph.getStreamNodes()) {
             if 
(node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing))
 {
                 for (StreamEdge edge : node.getInEdges()) {
@@ -342,7 +352,6 @@ public class StreamGraphGenerator {
         graph.setTimeCharacteristic(timeCharacteristic);
         graph.setJobName(jobName);
         graph.setJobType(shouldExecuteInBatchMode ? JobType.BATCH : 
JobType.STREAMING);
-        graph.setSlotSharingGroupResource(slotSharingGroupResources);
 
         if (shouldExecuteInBatchMode) {
 
@@ -452,6 +461,33 @@ public class StreamGraphGenerator {
             }
         }
 
+        transform
+                .getSlotSharingGroup()
+                .ifPresent(
+                        slotSharingGroup -> {
+                            final ResourceSpec resourceSpec =
+                                    
SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup);
+                            if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) {
+                                slotSharingGroupResources.compute(
+                                        slotSharingGroup.getName(),
+                                        (name, profile) -> {
+                                            if (profile == null) {
+                                                return 
ResourceProfile.fromResourceSpec(
+                                                        resourceSpec, 
MemorySize.ZERO);
+                                            } else if 
(!ResourceProfile.fromResourceSpec(
+                                                            resourceSpec, 
MemorySize.ZERO)
+                                                    .equals(profile)) {
+                                                throw new 
IllegalArgumentException(
+                                                        "The slot sharing 
group "
+                                                                + 
slotSharingGroup.getName()
+                                                                + " has been 
configured with two different resource spec.");
+                                            } else {
+                                                return profile;
+                                            }
+                                        });
+                            }
+                        });
+
         // call at least once to trigger exceptions about MissingTypeInfo
         transform.getOutputType();
 
@@ -720,7 +756,9 @@ public class StreamGraphGenerator {
 
         final String slotSharingGroup =
                 determineSlotSharingGroup(
-                        transform.getSlotSharingGroup(),
+                        transform.getSlotSharingGroup().isPresent()
+                                ? 
transform.getSlotSharingGroup().get().getName()
+                                : null,
                         allInputIds.stream()
                                 .flatMap(Collection::stream)
                                 .collect(Collectors.toList()));
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 23e3807..4a59ec4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.graph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -749,6 +750,62 @@ public class StreamGraphGeneratorTest extends TestLogger {
                 equalTo(SavepointRestoreSettings.forPath("/tmp/savepoint1")));
     }
 
+    @Test
+    public void testConfigureSlotSharingGroupResource() {
+        final SlotSharingGroup ssg1 =
+                
SlotSharingGroup.newBuilder("ssg1").setCpuCores(1).setTaskHeapMemoryMB(100).build();
+        final SlotSharingGroup ssg2 =
+                
SlotSharingGroup.newBuilder("ssg2").setCpuCores(2).setTaskHeapMemoryMB(200).build();
+        final SlotSharingGroup ssg3 =
+                
SlotSharingGroup.newBuilder(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+                        .setCpuCores(3)
+                        .setTaskHeapMemoryMB(300)
+                        .build();
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        final DataStream<Integer> source = 
env.fromElements(1).slotSharingGroup("ssg1");
+        source.map(value -> value)
+                .slotSharingGroup(ssg2)
+                .map(value -> value * 2)
+                .map(value -> value * 3)
+                .slotSharingGroup(SlotSharingGroup.newBuilder("ssg4").build())
+                .map(value -> value * 4)
+                .slotSharingGroup(ssg3)
+                .addSink(new DiscardingSink<>())
+                .slotSharingGroup(ssg1);
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+        assertThat(
+                streamGraph.getSlotSharingGroupResource("ssg1").get(),
+                is(ResourceProfile.fromResources(1, 100)));
+        assertThat(
+                streamGraph.getSlotSharingGroupResource("ssg2").get(),
+                is(ResourceProfile.fromResources(2, 200)));
+        assertThat(
+                streamGraph
+                        .getSlotSharingGroupResource(
+                                
StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+                        .get(),
+                is(ResourceProfile.fromResources(3, 300)));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConflictSlotSharingGroup() {
+        final SlotSharingGroup ssg =
+                
SlotSharingGroup.newBuilder("ssg").setCpuCores(1).setTaskHeapMemoryMB(100).build();
+        final SlotSharingGroup ssgConflict =
+                
SlotSharingGroup.newBuilder("ssg").setCpuCores(2).setTaskHeapMemoryMB(200).build();
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        final DataStream<Integer> source = 
env.fromElements(1).slotSharingGroup(ssg);
+        source.map(value -> value)
+                .slotSharingGroup(ssgConflict)
+                .addSink(new DiscardingSink<>())
+                .slotSharingGroup(ssgConflict);
+
+        env.getStreamGraph();
+    }
+
     private static class OutputTypeConfigurableFunction<T>
             implements OutputTypeConfigurable<T>, Function {
         private TypeInformation<T> typeInformation;

Reply via email to