This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 81e1db3c439c1758dccd1a20f2f6b70120f48ef7 Author: kevin.cyj <[email protected]> AuthorDate: Fri Jul 2 14:26:06 2021 +0800 [FLINK-23214][runtime] Make ShuffleMaster a cluster level shared service This patch makes ShuffleMaster a cluster level shared service, which makes it consistent with the ShuffleEnvironment. --- .../JobMasterServiceLeadershipRunnerFactory.java | 7 ------- .../runtime/jobmaster/JobManagerSharedServices.java | 19 +++++++++++++++++-- .../factories/DefaultJobMasterServiceFactory.java | 3 +-- .../TestingJobManagerSharedServicesBuilder.java | 12 +++++++++++- 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java index 9fb2c7c..ea1f2c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java @@ -38,8 +38,6 @@ import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFact import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.shuffle.ShuffleMaster; -import org.apache.flink.runtime.shuffle.ShuffleServiceLoader; import org.apache.flink.util.Preconditions; import static org.apache.flink.util.Preconditions.checkArgument; @@ -83,10 +81,6 @@ public enum JobMasterServiceLeadershipRunnerFactory implements JobManagerRunnerF "Adaptive Scheduler is required for reactive mode"); } - final ShuffleMaster<?> shuffleMaster = - ShuffleServiceLoader.loadShuffleServiceFactory(configuration) - .createShuffleMaster(configuration); - final LibraryCacheManager.ClassLoaderLease classLoaderLease = jobManagerServices .getLibraryCacheManager() @@ -111,7 +105,6 @@ public enum JobMasterServiceLeadershipRunnerFactory implements JobManagerRunnerF jobManagerJobMetricGroupFactory, fatalErrorHandler, userCodeClassLoader, - shuffleMaster, initializationTimestamp); final DefaultJobMasterServiceProcessFactory jobMasterServiceProcessFactory = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java index 8d092eb..b46e1e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java @@ -26,6 +26,8 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.shuffle.ShuffleServiceLoader; import org.apache.flink.runtime.util.Hardware; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -47,15 +49,19 @@ public class JobManagerSharedServices { private final LibraryCacheManager libraryCacheManager; + private final ShuffleMaster<?> shuffleMaster; + @Nonnull private final BlobWriter blobWriter; public JobManagerSharedServices( ScheduledExecutorService scheduledExecutorService, LibraryCacheManager libraryCacheManager, + ShuffleMaster<?> shuffleMaster, @Nonnull BlobWriter blobWriter) { this.scheduledExecutorService = checkNotNull(scheduledExecutorService); this.libraryCacheManager = checkNotNull(libraryCacheManager); + this.shuffleMaster = checkNotNull(shuffleMaster); this.blobWriter = blobWriter; } @@ -67,6 +73,10 @@ public class JobManagerSharedServices { return libraryCacheManager; } + public ShuffleMaster<?> getShuffleMaster() { + return shuffleMaster; + } + @Nonnull public BlobWriter getBlobWriter() { return blobWriter; @@ -103,7 +113,8 @@ public class JobManagerSharedServices { // ------------------------------------------------------------------------ public static JobManagerSharedServices fromConfiguration( - Configuration config, BlobServer blobServer, FatalErrorHandler fatalErrorHandler) { + Configuration config, BlobServer blobServer, FatalErrorHandler fatalErrorHandler) + throws Exception { checkNotNull(config); checkNotNull(blobServer); @@ -133,6 +144,10 @@ public class JobManagerSharedServices { Hardware.getNumberCPUCores(), new ExecutorThreadFactory("jobmanager-future")); - return new JobManagerSharedServices(futureExecutor, libraryCacheManager, blobServer); + final ShuffleMaster<?> shuffleMaster = + ShuffleServiceLoader.loadShuffleServiceFactory(config).createShuffleMaster(config); + + return new JobManagerSharedServices( + futureExecutor, libraryCacheManager, shuffleMaster, blobServer); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java index 637bdc5..bcde4b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java @@ -69,7 +69,6 @@ public class DefaultJobMasterServiceFactory implements JobMasterServiceFactory { JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, ClassLoader userCodeClassloader, - ShuffleMaster<?> shuffleMaster, long initializationTimestamp) { this.executor = executor; this.rpcService = rpcService; @@ -82,7 +81,7 @@ public class DefaultJobMasterServiceFactory implements JobMasterServiceFactory { this.jobManagerJobMetricGroupFactory = jobManagerJobMetricGroupFactory; this.fatalErrorHandler = fatalErrorHandler; this.userCodeClassloader = userCodeClassloader; - this.shuffleMaster = shuffleMaster; + this.shuffleMaster = jobManagerSharedServices.getShuffleMaster(); this.initializationTimestamp = initializationTimestamp; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java index 3ab21fa..d623fb8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java @@ -22,6 +22,8 @@ import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.execution.librarycache.ContextClassLoaderLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.shuffle.ShuffleTestUtils; import org.apache.flink.runtime.testutils.TestingUtils; import java.util.concurrent.ScheduledExecutorService; @@ -33,11 +35,14 @@ public class TestingJobManagerSharedServicesBuilder { private LibraryCacheManager libraryCacheManager; + private ShuffleMaster<?> shuffleMaster; + private BlobWriter blobWriter; public TestingJobManagerSharedServicesBuilder() { scheduledExecutorService = TestingUtils.defaultExecutor(); libraryCacheManager = ContextClassLoaderLibraryCacheManager.INSTANCE; + shuffleMaster = ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER; blobWriter = VoidBlobWriter.getInstance(); } @@ -47,6 +52,11 @@ public class TestingJobManagerSharedServicesBuilder { return this; } + public TestingJobManagerSharedServicesBuilder setShuffleMaster(ShuffleMaster<?> shuffleMaster) { + this.shuffleMaster = shuffleMaster; + return this; + } + public TestingJobManagerSharedServicesBuilder setLibraryCacheManager( LibraryCacheManager libraryCacheManager) { this.libraryCacheManager = libraryCacheManager; @@ -59,6 +69,6 @@ public class TestingJobManagerSharedServicesBuilder { public JobManagerSharedServices build() { return new JobManagerSharedServices( - scheduledExecutorService, libraryCacheManager, blobWriter); + scheduledExecutorService, libraryCacheManager, shuffleMaster, blobWriter); } }
