This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 25fdf5490db301b33157c99fe830d3497b6d0bdd
Author: Yangze Guo <[email protected]>
AuthorDate: Mon Jun 28 11:07:52 2021 +0800

    [FLINK-21925][core] Introduce fine-grained.shuffle-mode.all-blocking to 
avoid resource deadlock in batch jobs that apply fine-grained resource 
management
    
    This closes #16307
---
 .../apache/flink/configuration/ClusterOptions.java  |  8 ++++++++
 .../api/environment/StreamExecutionEnvironment.java | 21 +++++++++++++++++++++
 .../flink/streaming/api/graph/StreamGraph.java      |  5 +++++
 3 files changed, 34 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
index 1bb007b..667dc76 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
@@ -123,6 +123,14 @@ public class ClusterOptions {
                     .withDescription(
                             "Defines whether the cluster uses fine-grained 
resource management.");
 
+    @Documentation.ExcludeFromDocumentation
+    public static final ConfigOption<Boolean> 
FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING =
+            ConfigOptions.key("fine-grained.shuffle-mode.all-blocking")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to convert all PIPELINE edges to BLOCKING 
when apply fine-grained resource management in batch jobs.");
+
     public static JobManagerOptions.SchedulerType 
getSchedulerType(Configuration configuration) {
         if (isAdaptiveSchedulerEnabled(configuration) || 
isReactiveModeEnabled(configuration)) {
             return JobManagerOptions.SchedulerType.Adaptive;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ba5d107..9670b39 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -52,10 +52,12 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
@@ -91,6 +93,7 @@ import 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
@@ -2099,6 +2102,24 @@ public class StreamExecutionEnvironment {
     @Internal
     public StreamGraph getStreamGraph(String jobName, boolean 
clearTransformations) {
         StreamGraph streamGraph = 
getStreamGraphGenerator().setJobName(jobName).generate();
+
+        // There might be a resource deadlock when applying fine-grained 
resource management in
+        // batch jobs with PIPELINE edges. Users need to trigger the
+        // fine-grained.shuffle-mode.all-blocking to convert all edges to 
BLOCKING before we fix
+        // that issue.
+        if (configuration.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.BATCH
+                && streamGraph.hasFineGrainedResource()) {
+            if 
(configuration.get(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING)) {
+                
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+            } else {
+                throw new IllegalConfigurationException(
+                        "At the moment, fine-grained resource management 
requires batch workloads to "
+                                + "be executed with types of all edges being 
BLOCKING. To do that, you need to configure '"
+                                + 
ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING.key()
+                                + "' to 'true'. Notice that this may affect 
the performance. See FLINK-20865 for more details.");
+            }
+        }
+
         if (clearTransformations) {
             this.transformations.clear();
         }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 35f8b45..e3f284f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -253,6 +253,11 @@ public class StreamGraph implements Pipeline {
         return Optional.ofNullable(slotSharingGroupResources.get(groupId));
     }
 
+    public boolean hasFineGrainedResource() {
+        return slotSharingGroupResources.values().stream()
+                .anyMatch(resourceProfile -> 
!resourceProfile.equals(ResourceProfile.UNKNOWN));
+    }
+
     /**
      * Set whether to put all vertices into the same slot sharing group by 
default.
      *

Reply via email to