This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9f5fd073bd128006571c9e788c87938125fef52d Author: Till Rohrmann <[email protected]> AuthorDate: Thu Sep 27 14:17:21 2018 +0200 [FLINK-10411] Introduce DispatcherResourceManagerComponentFactory This commit introduces the DispatcherResourceManagerComponentFactory which is used to create a DispatcherResourceManagerComponent. That way, it is possible to eagerly initialize all fields of the DispatcherResourceManagerComponent making it possible to make all fields final and remove the lock. This closes #6743. --- .../entrypoint/ClassPathJobGraphRetriever.java | 2 +- .../entrypoint/StandaloneJobClusterEntryPoint.java | 8 +- .../entrypoint/MesosJobClusterEntrypoint.java | 10 +- .../entrypoint/MesosSessionClusterEntrypoint.java | 17 +- .../runtime/dispatcher/JobDispatcherFactory.java | 2 +- .../runtime/entrypoint/ClusterEntrypoint.java | 10 +- .../SessionDispatcherResourceManagerComponent.java | 34 --- .../StandaloneSessionClusterEntrypoint.java | 6 +- ...DispatcherResourceManagerComponentFactory.java} | 261 +++++++-------------- .../DispatcherResourceManagerComponent.java | 180 ++++++++++++++ .../DispatcherResourceManagerComponentFactory.java | 45 ++++ .../{ => component}/FileJobGraphRetriever.java | 2 +- .../JobDispatcherResourceManagerComponent.java | 28 ++- ...bDispatcherResourceManagerComponentFactory.java | 58 +++++ .../{ => component}/JobGraphRetriever.java | 2 +- .../SessionDispatcherResourceManagerComponent.java | 40 ++++ ...nDispatcherResourceManagerComponentFactory.java | 58 +++++ .../yarn/entrypoint/YarnJobClusterEntrypoint.java | 10 +- .../entrypoint/YarnSessionClusterEntrypoint.java | 8 +- 19 files changed, 522 insertions(+), 259 deletions(-) diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java index 5ded04b..3e0645d 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java @@ -23,7 +23,7 @@ import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.runtime.entrypoint.JobGraphRetriever; +import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.util.FlinkException; diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java index b81d992..6c42bf2 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -19,11 +19,11 @@ package org.apache.flink.container.entrypoint; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.entrypoint.ClusterComponent; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.entrypoint.FlinkParseException; -import org.apache.flink.runtime.entrypoint.JobClusterComponent; import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; @@ -61,8 +61,8 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { } @Override - protected ClusterComponent<?> createClusterComponent(Configuration configuration) { - return new JobClusterComponent( + protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) { + return new JobDispatcherResourceManagerComponentFactory( StandaloneResourceManagerFactory.INSTANCE, new ClassPathJobGraphRetriever(jobClassName, savepointRestoreSettings, programArguments)); } diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java index 922d5cd..377b5b5 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java @@ -28,11 +28,11 @@ import org.apache.flink.mesos.util.MesosConfiguration; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.entrypoint.ClusterComponent; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; -import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever; -import org.apache.flink.runtime.entrypoint.JobClusterComponent; import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever; +import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; @@ -109,8 +109,8 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { } @Override - protected ClusterComponent<?> createClusterComponent(Configuration configuration) { - return new JobClusterComponent( + protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) { + return new JobDispatcherResourceManagerComponentFactory( new MesosResourceManagerFactory( mesosServices, schedulerConfiguration, diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java index f691940..9879628 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java @@ -28,10 +28,10 @@ import org.apache.flink.mesos.util.MesosConfiguration; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.entrypoint.ClusterComponent; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; -import org.apache.flink.runtime.entrypoint.SessionClusterComponent; import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; @@ -108,12 +108,13 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint { } @Override - protected ClusterComponent<?> createClusterComponent(Configuration configuration) { - return new SessionClusterComponent(new MesosResourceManagerFactory( - mesosServices, - mesosConfig, - taskManagerParameters, - taskManagerContainerSpec)); + protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) { + return new SessionDispatcherResourceManagerComponentFactory( + new MesosResourceManagerFactory( + mesosServices, + mesosConfig, + taskManagerParameters, + taskManagerContainerSpec)); } public static void main(String[] args) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java index 488f2fc..e6b1a26 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; -import org.apache.flink.runtime.entrypoint.JobGraphRetriever; +import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 54ccaec..c9a1722 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -38,6 +38,8 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; import org.apache.flink.runtime.dispatcher.MiniDispatcher; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -109,7 +111,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { private final AtomicBoolean isShutDown = new AtomicBoolean(false); @GuardedBy("lock") - private ClusterComponent<?> clusterComponent; + private DispatcherResourceManagerComponent<?> clusterComponent; @GuardedBy("lock") private MetricRegistryImpl metricRegistry; @@ -204,9 +206,9 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); - clusterComponent = createClusterComponent(configuration); + final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration); - clusterComponent.startComponent( + clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, commonRpcService, haServices, @@ -460,7 +462,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { // Abstract methods // -------------------------------------------------- - protected abstract ClusterComponent<?> createClusterComponent(Configuration configuration); + protected abstract DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration); protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphStore( Configuration configuration, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionDispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionDispatcherResourceManagerComponent.java deleted file mode 100644 index 8ab0701..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionDispatcherResourceManagerComponent.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.entrypoint; - -import org.apache.flink.runtime.dispatcher.Dispatcher; -import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory; -import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; -import org.apache.flink.runtime.rest.SessionRestEndpointFactory; - -/** - * {@link ClusterComponent} used by session clusters. - */ -public class SessionClusterComponent extends ClusterComponent<Dispatcher> { - - public SessionClusterComponent(ResourceManagerFactory<?> resourceManagerFactory) { - super(SessionDispatcherFactory.INSTANCE, resourceManagerFactory, SessionRestEndpointFactory.INSTANCE); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java index 1675235..127fc8b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.entrypoint; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; import org.apache.flink.runtime.util.EnvironmentInformation; @@ -35,8 +37,8 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint } @Override - protected ClusterComponent<?> createClusterComponent(Configuration configuration) { - return new SessionClusterComponent(StandaloneResourceManagerFactory.INSTANCE); + protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) { + return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE); } public static void main(String[] args) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/DispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java similarity index 51% rename from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/DispatcherResourceManagerComponent.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java index af02729..0a37411 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/DispatcherResourceManagerComponent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java @@ -16,14 +16,13 @@ * limitations under the License. */ -package org.apache.flink.runtime.entrypoint; +package org.apache.flink.runtime.entrypoint.component; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.blob.BlobServer; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; @@ -32,6 +31,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherFactory; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.HistoryServerArchivist; +import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -46,102 +46,54 @@ import org.apache.flink.runtime.rest.RestEndpointFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever; -import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.Preconditions; +import org.apache.flink.util.FlinkException; import akka.actor.ActorSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; +import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CompletableFuture; /** - * Component which starts a {@link Dispatcher}, {@link ResourceManager} and {@link WebMonitorEndpoint} - * in the same process. + * Abstract class which implements the creation of the {@link DispatcherResourceManagerComponent} components. + * + * @param <T> type of the {@link Dispatcher} + * @param <U> type of the {@link RestfulGateway} given to the {@link WebMonitorEndpoint} */ -public class ClusterComponent<T extends Dispatcher> implements AutoCloseableAsync { - - private static final Logger LOG = LoggerFactory.getLogger(ClusterComponent.class); +public abstract class AbstractDispatcherResourceManagerComponentFactory<T extends Dispatcher, U extends RestfulGateway> implements DispatcherResourceManagerComponentFactory<T> { - private final Object lock = new Object(); + private final Logger log = LoggerFactory.getLogger(getClass()); + @Nonnull private final DispatcherFactory<T> dispatcherFactory; + @Nonnull private final ResourceManagerFactory<?> resourceManagerFactory; - private final RestEndpointFactory<?> restEndpointFactory; - - private final CompletableFuture<Void> terminationFuture; - - private final CompletableFuture<ApplicationStatus> shutDownFuture; - - @GuardedBy("lock") - private State state; - - @GuardedBy("lock") - private ResourceManager<?> resourceManager; + @Nonnull + private final RestEndpointFactory<U> restEndpointFactory; - @GuardedBy("lock") - private T dispatcher; - - @GuardedBy("lock") - private LeaderRetrievalService dispatcherLeaderRetrievalService; - - @GuardedBy("lock") - private LeaderRetrievalService resourceManagerRetrievalService; - - @GuardedBy("lock") - private WebMonitorEndpoint<?> webMonitorEndpoint; - - @GuardedBy("lock") - private JobManagerMetricGroup jobManagerMetricGroup; - - public ClusterComponent( - DispatcherFactory<T> dispatcherFactory, - ResourceManagerFactory<?> resourceManagerFactory, - RestEndpointFactory<?> restEndpointFactory) { + public AbstractDispatcherResourceManagerComponentFactory( + @Nonnull DispatcherFactory<T> dispatcherFactory, + @Nonnull ResourceManagerFactory<?> resourceManagerFactory, + @Nonnull RestEndpointFactory<U> restEndpointFactory) { this.dispatcherFactory = dispatcherFactory; this.resourceManagerFactory = resourceManagerFactory; this.restEndpointFactory = restEndpointFactory; - this.terminationFuture = new CompletableFuture<>(); - this.shutDownFuture = new CompletableFuture<>(); - this.state = State.CREATED; - - terminationFuture.whenComplete( - (aVoid, throwable) -> { - if (throwable != null) { - shutDownFuture.completeExceptionally(throwable); - } else { - shutDownFuture.complete(ApplicationStatus.SUCCEEDED); - } - }); } - public T getDispatcher() { - synchronized (lock) { - return dispatcher; - } - } - - public CompletableFuture<Void> getTerminationFuture() { - return terminationFuture; - } - - public CompletableFuture<ApplicationStatus> getShutDownFuture() { - return shutDownFuture; - } - - public void startComponent( + @Override + public DispatcherResourceManagerComponent<T> create( Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, @@ -150,22 +102,27 @@ public class ClusterComponent<T extends Dispatcher> implements AutoCloseableAsyn MetricRegistry metricRegistry, ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler) throws Exception { - synchronized (lock) { - Preconditions.checkState(state == State.CREATED); - state = State.RUNNING; + LeaderRetrievalService dispatcherLeaderRetrievalService = null; + LeaderRetrievalService resourceManagerRetrievalService = null; + WebMonitorEndpoint<U> webMonitorEndpoint = null; + ResourceManager<?> resourceManager = null; + JobManagerMetricGroup jobManagerMetricGroup = null; + T dispatcher = null; + + try { dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever(); - LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>( + final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>( rpcService, DispatcherGateway.class, DispatcherId::fromUuid, 10, Time.milliseconds(50L)); - LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( + final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( rpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, @@ -186,7 +143,7 @@ public class ClusterComponent<T extends Dispatcher> implements AutoCloseableAsyn highAvailabilityServices.getWebMonitorLeaderElectionService(), fatalErrorHandler); - LOG.debug("Starting Dispatcher REST endpoint."); + log.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start(); resourceManager = resourceManagerFactory.createResourceManager( @@ -221,127 +178,77 @@ public class ClusterComponent<T extends Dispatcher> implements AutoCloseableAsyn webMonitorEndpoint.getRestBaseUrl(), historyServerArchivist); - registerShutDownFuture(dispatcher, shutDownFuture); - - LOG.debug("Starting ResourceManager."); + log.debug("Starting ResourceManager."); resourceManager.start(); resourceManagerRetrievalService.start(resourceManagerGatewayRetriever); - LOG.debug("Starting Dispatcher."); + log.debug("Starting Dispatcher."); dispatcher.start(); dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever); - } - } - - protected void registerShutDownFuture(T dispatcher, CompletableFuture<ApplicationStatus> shutDownFuture) { - dispatcher - .getTerminationFuture() - .whenComplete( - (aVoid, throwable) -> { - if (throwable != null) { - shutDownFuture.completeExceptionally(throwable); - } else { - shutDownFuture.complete(ApplicationStatus.SUCCEEDED); - } - }); - } - - @Override - public CompletableFuture<Void> closeAsync() { - synchronized (lock) { - if (state == State.RUNNING) { - Exception exception = null; - - final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(4); - - if (dispatcherLeaderRetrievalService != null) { - try { - dispatcherLeaderRetrievalService.stop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - } - if (resourceManagerRetrievalService != null) { - try { - resourceManagerRetrievalService.stop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - } - - if (webMonitorEndpoint != null) { - terminationFutures.add(webMonitorEndpoint.closeAsync()); + return createDispatcherResourceManagerComponent( + dispatcher, + resourceManager, + dispatcherLeaderRetrievalService, + resourceManagerRetrievalService, + webMonitorEndpoint, + jobManagerMetricGroup); + + } catch (Exception exception) { + // clean up all started components + if (dispatcherLeaderRetrievalService != null) { + try { + dispatcherLeaderRetrievalService.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); } + } - if (dispatcher != null) { - dispatcher.shutDown(); - terminationFutures.add(dispatcher.getTerminationFuture()); + if (resourceManagerRetrievalService != null) { + try { + resourceManagerRetrievalService.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); } + } - if (resourceManager != null) { - resourceManager.shutDown(); - terminationFutures.add(resourceManager.getTerminationFuture()); - } + final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3); - if (exception != null) { - terminationFutures.add(FutureUtils.completedExceptionally(exception)); - } + if (webMonitorEndpoint != null) { + terminationFutures.add(webMonitorEndpoint.closeAsync()); + } - final CompletableFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures); + if (resourceManager != null) { + resourceManager.shutDown(); + terminationFutures.add(resourceManager.getTerminationFuture()); + } - final CompletableFuture<Void> metricGroupTerminationFuture; + if (dispatcher != null) { + dispatcher.shutDown(); + terminationFutures.add(dispatcher.getTerminationFuture()); + } - if (jobManagerMetricGroup != null) { - metricGroupTerminationFuture = FutureUtils.runAfterwards( - componentTerminationFuture, - () -> { - synchronized (lock) { - jobManagerMetricGroup.close(); - } - }); - } else { - metricGroupTerminationFuture = componentTerminationFuture; - } + final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeAll(terminationFutures); - metricGroupTerminationFuture.whenComplete((aVoid, throwable) -> { - if (throwable != null) { - terminationFuture.completeExceptionally(throwable); - } else { - terminationFuture.complete(aVoid); - } - }); - } else if (state == State.CREATED) { - terminationFuture.complete(null); + try { + terminationFuture.get(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); } - state = State.TERMINATED; - return terminationFuture; - } - } - - /** - * Deregister the Flink application from the resource management system by signalling - * the {@link ResourceManager}. - * - * @param applicationStatus to terminate the application with - * @param diagnostics additional information about the shut down, can be {@code null} - * @return Future which is completed once the shut down - */ - public CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) { - synchronized (lock) { - if (resourceManager != null) { - final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); - return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null); - } else { - return CompletableFuture.completedFuture(null); + if (jobManagerMetricGroup != null) { + jobManagerMetricGroup.close(); } + + throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception); } } - enum State { - CREATED, - RUNNING, - TERMINATED - } + protected abstract DispatcherResourceManagerComponent<T> createDispatcherResourceManagerComponent( + T dispatcher, + ResourceManager<?> resourceManager, + LeaderRetrievalService dispatcherLeaderRetrievalService, + LeaderRetrievalService resourceManagerRetrievalService, + WebMonitorEndpoint<?> webMonitorEndpoint, + JobManagerMetricGroup jobManagerMetricGroup); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java new file mode 100644 index 0000000..94925b2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.component; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.ExceptionUtils; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Component which starts a {@link Dispatcher}, {@link ResourceManager} and {@link WebMonitorEndpoint} + * in the same process. + */ +public class DispatcherResourceManagerComponent<T extends Dispatcher> implements AutoCloseableAsync { + + @Nonnull + private final T dispatcher; + + @Nonnull + private final ResourceManager<?> resourceManager; + + @Nonnull + private final LeaderRetrievalService dispatcherLeaderRetrievalService; + + @Nonnull + private final LeaderRetrievalService resourceManagerRetrievalService; + + @Nonnull + private final WebMonitorEndpoint<?> webMonitorEndpoint; + + @Nonnull + private final JobManagerMetricGroup jobManagerMetricGroup; + + private final CompletableFuture<Void> terminationFuture; + + private final CompletableFuture<ApplicationStatus> shutDownFuture; + + private final AtomicBoolean isRunning = new AtomicBoolean(true); + + DispatcherResourceManagerComponent( + @Nonnull T dispatcher, + @Nonnull ResourceManager<?> resourceManager, + @Nonnull LeaderRetrievalService dispatcherLeaderRetrievalService, + @Nonnull LeaderRetrievalService resourceManagerRetrievalService, + @Nonnull WebMonitorEndpoint<?> webMonitorEndpoint, + @Nonnull JobManagerMetricGroup jobManagerMetricGroup) { + this.resourceManager = resourceManager; + this.dispatcher = dispatcher; + this.dispatcherLeaderRetrievalService = dispatcherLeaderRetrievalService; + this.resourceManagerRetrievalService = resourceManagerRetrievalService; + this.webMonitorEndpoint = webMonitorEndpoint; + this.jobManagerMetricGroup = jobManagerMetricGroup; + this.terminationFuture = new CompletableFuture<>(); + this.shutDownFuture = new CompletableFuture<>(); + + registerShutDownFuture(); + } + + private void registerShutDownFuture() { + terminationFuture.whenComplete( + (aVoid, throwable) -> { + if (throwable != null) { + shutDownFuture.completeExceptionally(throwable); + } else { + shutDownFuture.complete(ApplicationStatus.SUCCEEDED); + } + }); + + dispatcher + .getTerminationFuture() + .whenComplete( + (aVoid, throwable) -> { + if (throwable != null) { + shutDownFuture.completeExceptionally(throwable); + } else { + shutDownFuture.complete(ApplicationStatus.SUCCEEDED); + } + }); + } + + public CompletableFuture<Void> getTerminationFuture() { + return terminationFuture; + } + + public final CompletableFuture<ApplicationStatus> getShutDownFuture() { + return shutDownFuture; + } + + @Override + public CompletableFuture<Void> closeAsync() { + if (isRunning.compareAndSet(true, false)) { + Exception exception = null; + + final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(4); + + try { + dispatcherLeaderRetrievalService.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { + resourceManagerRetrievalService.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + terminationFutures.add(webMonitorEndpoint.closeAsync()); + + dispatcher.shutDown(); + terminationFutures.add(dispatcher.getTerminationFuture()); + + resourceManager.shutDown(); + terminationFutures.add(resourceManager.getTerminationFuture()); + + if (exception != null) { + terminationFutures.add(FutureUtils.completedExceptionally(exception)); + } + + final CompletableFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures); + + final CompletableFuture<Void> metricGroupTerminationFuture = FutureUtils.runAfterwards( + componentTerminationFuture, + jobManagerMetricGroup::close); + + metricGroupTerminationFuture.whenComplete((aVoid, throwable) -> { + if (throwable != null) { + terminationFuture.completeExceptionally(throwable); + } else { + terminationFuture.complete(aVoid); + } + }); + } + + return terminationFuture; + } + + /** + * Deregister the Flink application from the resource management system by signalling + * the {@link ResourceManager}. + * + * @param applicationStatus to terminate the application with + * @param diagnostics additional information about the shut down, can be {@code null} + * @return Future which is completed once the shut down + */ + public CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) { + final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); + return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java new file mode 100644 index 0000000..df22a59 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.component; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +/** + * Factory for the {@link DispatcherResourceManagerComponent}. + */ +public interface DispatcherResourceManagerComponentFactory<T extends Dispatcher> { + + DispatcherResourceManagerComponent<T> create( + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + BlobServer blobServer, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + ArchivedExecutionGraphStore archivedExecutionGraphStore, + FatalErrorHandler fatalErrorHandler) throws Exception; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java similarity index 97% rename from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java index 7a194f6..1848408 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.entrypoint; +package org.apache.flink.runtime.entrypoint.component; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobDispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java similarity index 54% rename from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobDispatcherResourceManagerComponent.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java index 17583ac..c1df47f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobDispatcherResourceManagerComponent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java @@ -16,29 +16,33 @@ * limitations under the License. */ -package org.apache.flink.runtime.entrypoint; +package org.apache.flink.runtime.entrypoint.component; import org.apache.flink.runtime.clusterframework.ApplicationStatus; -import org.apache.flink.runtime.dispatcher.JobDispatcherFactory; import org.apache.flink.runtime.dispatcher.MiniDispatcher; -import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; -import org.apache.flink.runtime.rest.JobRestEndpointFactory; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; import java.util.concurrent.CompletableFuture; /** - * {@link ClusterComponent} for a job cluster. The dispatcher component starts + * {@link DispatcherResourceManagerComponent} for a job cluster. The dispatcher component starts * a {@link MiniDispatcher}. */ -public class JobClusterComponent extends ClusterComponent<MiniDispatcher> { +class JobDispatcherResourceManagerComponent extends DispatcherResourceManagerComponent<MiniDispatcher> { - public JobClusterComponent(ResourceManagerFactory<?> resourceManagerFactory, JobGraphRetriever jobActions) { - super(new JobDispatcherFactory(jobActions), resourceManagerFactory, JobRestEndpointFactory.INSTANCE); - } + JobDispatcherResourceManagerComponent( + MiniDispatcher dispatcher, + ResourceManager<?> resourceManager, + LeaderRetrievalService dispatcherLeaderRetrievalService, + LeaderRetrievalService resourceManagerRetrievalService, + WebMonitorEndpoint<?> webMonitorEndpoint, + JobManagerMetricGroup jobManagerMetricGroup) { + super(dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup); - @Override - protected void registerShutDownFuture(MiniDispatcher dispatcher, CompletableFuture<ApplicationStatus> shutDownFuture) { - super.registerShutDownFuture(dispatcher, shutDownFuture); + final CompletableFuture<ApplicationStatus> shutDownFuture = getShutDownFuture(); dispatcher.getJobTerminationFuture().whenComplete((applicationStatus, throwable) -> { if (throwable != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java new file mode 100644 index 0000000..c7ce14c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.component; + +import org.apache.flink.runtime.dispatcher.JobDispatcherFactory; +import org.apache.flink.runtime.dispatcher.MiniDispatcher; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; +import org.apache.flink.runtime.rest.JobRestEndpointFactory; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; + +import javax.annotation.Nonnull; + +/** + * {@link DispatcherResourceManagerComponentFactory} for a {@link JobDispatcherResourceManagerComponent}. + */ +public class JobDispatcherResourceManagerComponentFactory extends AbstractDispatcherResourceManagerComponentFactory<MiniDispatcher, RestfulGateway> { + + public JobDispatcherResourceManagerComponentFactory(@Nonnull ResourceManagerFactory<?> resourceManagerFactory, @Nonnull JobGraphRetriever jobGraphRetriever) { + super(new JobDispatcherFactory(jobGraphRetriever), resourceManagerFactory, JobRestEndpointFactory.INSTANCE); + } + + @Override + protected DispatcherResourceManagerComponent<MiniDispatcher> createDispatcherResourceManagerComponent( + MiniDispatcher dispatcher, + ResourceManager<?> resourceManager, + LeaderRetrievalService dispatcherLeaderRetrievalService, + LeaderRetrievalService resourceManagerRetrievalService, + WebMonitorEndpoint<?> webMonitorEndpoint, + JobManagerMetricGroup jobManagerMetricGroup) { + return new JobDispatcherResourceManagerComponent( + dispatcher, + resourceManager, + dispatcherLeaderRetrievalService, + resourceManagerRetrievalService, + webMonitorEndpoint, + jobManagerMetricGroup); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java similarity index 96% rename from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java index e2ace15..b1586ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.entrypoint; +package org.apache.flink.runtime.entrypoint.component; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java new file mode 100644 index 0000000..8be7b74 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.component; + +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; + +/** + * {@link DispatcherResourceManagerComponent} used by session clusters. + */ +class SessionDispatcherResourceManagerComponent extends DispatcherResourceManagerComponent<Dispatcher> { + SessionDispatcherResourceManagerComponent( + Dispatcher dispatcher, + ResourceManager<?> resourceManager, + LeaderRetrievalService dispatcherLeaderRetrievalService, + LeaderRetrievalService resourceManagerRetrievalService, + WebMonitorEndpoint<?> webMonitorEndpoint, + JobManagerMetricGroup jobManagerMetricGroup) { + super(dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java new file mode 100644 index 0000000..c44833d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.component; + +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; +import org.apache.flink.runtime.rest.SessionRestEndpointFactory; +import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; + +import javax.annotation.Nonnull; + +/** + * {@link DispatcherResourceManagerComponentFactory} for a {@link SessionDispatcherResourceManagerComponent}. + */ +public class SessionDispatcherResourceManagerComponentFactory extends AbstractDispatcherResourceManagerComponentFactory<Dispatcher, DispatcherGateway> { + + public SessionDispatcherResourceManagerComponentFactory(@Nonnull ResourceManagerFactory<?> resourceManagerFactory) { + super(SessionDispatcherFactory.INSTANCE, resourceManagerFactory, SessionRestEndpointFactory.INSTANCE); + } + + @Override + protected DispatcherResourceManagerComponent<Dispatcher> createDispatcherResourceManagerComponent( + Dispatcher dispatcher, + ResourceManager<?> resourceManager, + LeaderRetrievalService dispatcherLeaderRetrievalService, + LeaderRetrievalService resourceManagerRetrievalService, + WebMonitorEndpoint<?> webMonitorEndpoint, + JobManagerMetricGroup jobManagerMetricGroup) { + return new SessionDispatcherResourceManagerComponent( + dispatcher, + resourceManager, + dispatcherLeaderRetrievalService, + resourceManagerRetrievalService, + webMonitorEndpoint, + jobManagerMetricGroup); + } +} diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java index 1733f49..42e666e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java @@ -19,11 +19,11 @@ package org.apache.flink.yarn.entrypoint; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.entrypoint.ClusterComponent; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; -import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever; -import org.apache.flink.runtime.entrypoint.JobClusterComponent; import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever; +import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; @@ -62,8 +62,8 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint { } @Override - protected ClusterComponent<?> createClusterComponent(Configuration configuration) { - return new JobClusterComponent( + protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) { + return new JobDispatcherResourceManagerComponentFactory( YarnResourceManagerFactory.INSTANCE, FileJobGraphRetriever.createFrom(configuration)); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java index e0bebfd..0f4656e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java @@ -19,10 +19,10 @@ package org.apache.flink.yarn.entrypoint; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.entrypoint.ClusterComponent; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; -import org.apache.flink.runtime.entrypoint.SessionClusterComponent; import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; @@ -60,8 +60,8 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint { } @Override - protected ClusterComponent<?> createClusterComponent(Configuration configuration) { - return new SessionClusterComponent(YarnResourceManagerFactory.INSTANCE); + protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) { + return new SessionDispatcherResourceManagerComponentFactory(YarnResourceManagerFactory.INSTANCE); } public static void main(String[] args) {
