This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 81e1db3c439c1758dccd1a20f2f6b70120f48ef7
Author: kevin.cyj <[email protected]>
AuthorDate: Fri Jul 2 14:26:06 2021 +0800

    [FLINK-23214][runtime] Make ShuffleMaster a cluster level shared service
    
    This patch makes ShuffleMaster a cluster level shared service, which makes 
it consistent with the ShuffleEnvironment.
---
 .../JobMasterServiceLeadershipRunnerFactory.java      |  7 -------
 .../runtime/jobmaster/JobManagerSharedServices.java   | 19 +++++++++++++++++--
 .../factories/DefaultJobMasterServiceFactory.java     |  3 +--
 .../TestingJobManagerSharedServicesBuilder.java       | 12 +++++++++++-
 4 files changed, 29 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
index 9fb2c7c..ea1f2c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
@@ -38,8 +38,6 @@ import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFact
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
-import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -83,10 +81,6 @@ public enum JobMasterServiceLeadershipRunnerFactory 
implements JobManagerRunnerF
                     "Adaptive Scheduler is required for reactive mode");
         }
 
-        final ShuffleMaster<?> shuffleMaster =
-                ShuffleServiceLoader.loadShuffleServiceFactory(configuration)
-                        .createShuffleMaster(configuration);
-
         final LibraryCacheManager.ClassLoaderLease classLoaderLease =
                 jobManagerServices
                         .getLibraryCacheManager()
@@ -111,7 +105,6 @@ public enum JobMasterServiceLeadershipRunnerFactory 
implements JobManagerRunnerF
                         jobManagerJobMetricGroupFactory,
                         fatalErrorHandler,
                         userCodeClassLoader,
-                        shuffleMaster,
                         initializationTimestamp);
 
         final DefaultJobMasterServiceProcessFactory 
jobMasterServiceProcessFactory =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
index 8d092eb..b46e1e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -47,15 +49,19 @@ public class JobManagerSharedServices {
 
     private final LibraryCacheManager libraryCacheManager;
 
+    private final ShuffleMaster<?> shuffleMaster;
+
     @Nonnull private final BlobWriter blobWriter;
 
     public JobManagerSharedServices(
             ScheduledExecutorService scheduledExecutorService,
             LibraryCacheManager libraryCacheManager,
+            ShuffleMaster<?> shuffleMaster,
             @Nonnull BlobWriter blobWriter) {
 
         this.scheduledExecutorService = checkNotNull(scheduledExecutorService);
         this.libraryCacheManager = checkNotNull(libraryCacheManager);
+        this.shuffleMaster = checkNotNull(shuffleMaster);
         this.blobWriter = blobWriter;
     }
 
@@ -67,6 +73,10 @@ public class JobManagerSharedServices {
         return libraryCacheManager;
     }
 
+    public ShuffleMaster<?> getShuffleMaster() {
+        return shuffleMaster;
+    }
+
     @Nonnull
     public BlobWriter getBlobWriter() {
         return blobWriter;
@@ -103,7 +113,8 @@ public class JobManagerSharedServices {
     // ------------------------------------------------------------------------
 
     public static JobManagerSharedServices fromConfiguration(
-            Configuration config, BlobServer blobServer, FatalErrorHandler 
fatalErrorHandler) {
+            Configuration config, BlobServer blobServer, FatalErrorHandler 
fatalErrorHandler)
+            throws Exception {
 
         checkNotNull(config);
         checkNotNull(blobServer);
@@ -133,6 +144,10 @@ public class JobManagerSharedServices {
                         Hardware.getNumberCPUCores(),
                         new ExecutorThreadFactory("jobmanager-future"));
 
-        return new JobManagerSharedServices(futureExecutor, 
libraryCacheManager, blobServer);
+        final ShuffleMaster<?> shuffleMaster =
+                
ShuffleServiceLoader.loadShuffleServiceFactory(config).createShuffleMaster(config);
+
+        return new JobManagerSharedServices(
+                futureExecutor, libraryCacheManager, shuffleMaster, 
blobServer);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
index 637bdc5..bcde4b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
@@ -69,7 +69,6 @@ public class DefaultJobMasterServiceFactory implements 
JobMasterServiceFactory {
             JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
             FatalErrorHandler fatalErrorHandler,
             ClassLoader userCodeClassloader,
-            ShuffleMaster<?> shuffleMaster,
             long initializationTimestamp) {
         this.executor = executor;
         this.rpcService = rpcService;
@@ -82,7 +81,7 @@ public class DefaultJobMasterServiceFactory implements 
JobMasterServiceFactory {
         this.jobManagerJobMetricGroupFactory = jobManagerJobMetricGroupFactory;
         this.fatalErrorHandler = fatalErrorHandler;
         this.userCodeClassloader = userCodeClassloader;
-        this.shuffleMaster = shuffleMaster;
+        this.shuffleMaster = jobManagerSharedServices.getShuffleMaster();
         this.initializationTimestamp = initializationTimestamp;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
index 3ab21fa..d623fb8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
@@ -22,6 +22,8 @@ import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import 
org.apache.flink.runtime.execution.librarycache.ContextClassLoaderLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleTestUtils;
 import org.apache.flink.runtime.testutils.TestingUtils;
 
 import java.util.concurrent.ScheduledExecutorService;
@@ -33,11 +35,14 @@ public class TestingJobManagerSharedServicesBuilder {
 
     private LibraryCacheManager libraryCacheManager;
 
+    private ShuffleMaster<?> shuffleMaster;
+
     private BlobWriter blobWriter;
 
     public TestingJobManagerSharedServicesBuilder() {
         scheduledExecutorService = TestingUtils.defaultExecutor();
         libraryCacheManager = ContextClassLoaderLibraryCacheManager.INSTANCE;
+        shuffleMaster = ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER;
         blobWriter = VoidBlobWriter.getInstance();
     }
 
@@ -47,6 +52,11 @@ public class TestingJobManagerSharedServicesBuilder {
         return this;
     }
 
+    public TestingJobManagerSharedServicesBuilder 
setShuffleMaster(ShuffleMaster<?> shuffleMaster) {
+        this.shuffleMaster = shuffleMaster;
+        return this;
+    }
+
     public TestingJobManagerSharedServicesBuilder setLibraryCacheManager(
             LibraryCacheManager libraryCacheManager) {
         this.libraryCacheManager = libraryCacheManager;
@@ -59,6 +69,6 @@ public class TestingJobManagerSharedServicesBuilder {
 
     public JobManagerSharedServices build() {
         return new JobManagerSharedServices(
-                scheduledExecutorService, libraryCacheManager, blobWriter);
+                scheduledExecutorService, libraryCacheManager, shuffleMaster, 
blobWriter);
     }
 }

Reply via email to