This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3dd57624b2df7a856352a64a89af8606ae0c61a4 Author: Kostas Kloudas <[email protected]> AuthorDate: Mon Oct 19 09:49:14 2020 +0200 [FLINK-19154] ApplicationDispatcherBootstrap cleans up HA data only on FAILED, CANCELLED, SUCCEEDED Depending on the status with which a job got terminated, we may want to shutdown the cluster and clean up all HA data, or not. To be able to differentiate between the different termination reasons we add the ApplicationFailureException. In addition, to be able to shutdown the cluster without cleaning up the HA data, we need to be able to terminate the dispatcher's shutdown future with an exception. This is what the new error handler pass in the ApplicationDispatcherBootstrap does. We chose to pass the FatalErrorHandler as a constructor argument because this allows for more robust code. --- .../ApplicationDispatcherBootstrap.java | 68 +++++++--------- ...ApplicationDispatcherGatewayServiceFactory.java | 6 +- .../application/ApplicationFailureException.java | 82 +++++++++++++++++++ .../ApplicationDispatcherBootstrapTest.java | 93 +++++++++++++--------- .../flink/runtime/dispatcher/Dispatcher.java | 11 ++- .../runtime/dispatcher/DispatcherFactory.java | 11 ++- .../runtime/dispatcher/JobDispatcherFactory.java | 7 +- .../flink/runtime/dispatcher/MiniDispatcher.java | 6 +- .../dispatcher/SessionDispatcherFactory.java | 7 +- .../runtime/dispatcher/StandaloneDispatcher.java | 7 +- .../DefaultDispatcherGatewayServiceFactory.java | 2 +- .../dispatcher/DispatcherResourceCleanupTest.java | 2 +- .../flink/runtime/dispatcher/DispatcherTest.java | 15 ++-- .../runtime/dispatcher/MiniDispatcherTest.java | 2 +- .../runtime/dispatcher/TestingDispatcher.java | 5 +- .../runner/DefaultDispatcherRunnerITCase.java | 6 +- 16 files changed, 218 insertions(+), 112 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java index 2b4ddf3..e581e29 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java @@ -30,7 +30,6 @@ import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; -import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; @@ -42,8 +41,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.SerializedThrowable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +51,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ScheduledFuture; @@ -85,6 +83,8 @@ public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap private final Configuration configuration; + private final FatalErrorHandler errorHandler; + private CompletableFuture<Void> applicationCompletionFuture; private ScheduledFuture<?> applicationExecutionTask; @@ -92,10 +92,12 @@ public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap public ApplicationDispatcherBootstrap( final PackagedProgram application, final Collection<JobGraph> recoveredJobs, - final Configuration configuration) { + final Configuration configuration, + final FatalErrorHandler errorHandler) { this.configuration = checkNotNull(configuration); this.recoveredJobs = checkNotNull(recoveredJobs); this.application = checkNotNull(application); + this.errorHandler = checkNotNull(errorHandler); } @Override @@ -138,28 +140,28 @@ public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap return applicationCompletionFuture .handle((r, t) -> { - final ApplicationStatus applicationStatus; if (t != null) { - final Optional<JobCancellationException> cancellationException = - ExceptionUtils.findThrowable(t, JobCancellationException.class); - - if (cancellationException.isPresent()) { - // this means the Flink Job was cancelled - applicationStatus = ApplicationStatus.CANCELED; - } else if (t instanceof CancellationException) { - // this means that the future was cancelled - applicationStatus = ApplicationStatus.UNKNOWN; - } else { - applicationStatus = ApplicationStatus.FAILED; + final Optional<ApplicationFailureException> exception = + ExceptionUtils.findThrowable(t, ApplicationFailureException.class); + + if (exception.isPresent()) { + final ApplicationStatus applicationStatus = exception.get().getStatus(); + + if (applicationStatus == ApplicationStatus.CANCELED || applicationStatus == ApplicationStatus.FAILED) { + LOG.info("Application {}: ", applicationStatus, t); + return dispatcher.shutDownCluster(applicationStatus); + } } - LOG.warn("Application {}: ", applicationStatus, t); - } else { - applicationStatus = ApplicationStatus.SUCCEEDED; - LOG.info("Application completed SUCCESSFULLY"); + LOG.warn("Exiting with Application Status UNKNOWN: ", t); + this.errorHandler.onFatalError(t); + + return FutureUtils.<Acknowledge>completedExceptionally(t); } - return dispatcher.shutDownCluster(applicationStatus); + + LOG.info("Application completed SUCCESSFULLY"); + return dispatcher.shutDownCluster(ApplicationStatus.SUCCEEDED); }) .thenCompose(Function.identity()); } @@ -275,31 +277,15 @@ public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap * Otherwise, this returns a future that is finished exceptionally (potentially with an * exception from the {@link JobResult}. */ - private CompletableFuture<JobResult> unwrapJobResultException( - CompletableFuture<JobResult> jobResult) { + private CompletableFuture<JobResult> unwrapJobResultException(final CompletableFuture<JobResult> jobResult) { return jobResult.thenApply(result -> { if (result.isSuccess()) { return result; } - Optional<SerializedThrowable> serializedThrowable = result.getSerializedThrowable(); - - if (result.getApplicationStatus() == ApplicationStatus.CANCELED) { - throw new CompletionException( - new JobCancellationException( - result.getJobId(), - "Job was cancelled.", - serializedThrowable.orElse(null))); - } - - if (serializedThrowable.isPresent()) { - Throwable throwable = - serializedThrowable - .get() - .deserializeError(application.getUserCodeClassLoader()); - throw new CompletionException(throwable); - } - throw new RuntimeException("Job execution failed for unknown reason."); + throw new CompletionException( + ApplicationFailureException.fromJobResult( + result, application.getUserCodeClassLoader())); }); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java index fdac2ec..1389b34 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.dispatcher.Dispatcher; -import org.apache.flink.runtime.dispatcher.DispatcherBootstrap; import org.apache.flink.runtime.dispatcher.DispatcherFactory; import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; @@ -78,15 +77,12 @@ public class ApplicationDispatcherGatewayServiceFactory implements AbstractDispa Collection<JobGraph> recoveredJobs, JobGraphWriter jobGraphWriter) { - final DispatcherBootstrap bootstrap = - new ApplicationDispatcherBootstrap(application, recoveredJobs, configuration); - final Dispatcher dispatcher; try { dispatcher = dispatcherFactory.createDispatcher( rpcService, fencingToken, - bootstrap, + errorHandler -> new ApplicationDispatcherBootstrap(application, recoveredJobs, configuration, errorHandler), PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter)); } catch (Exception e) { throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationFailureException.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationFailureException.java new file mode 100644 index 0000000..7bb8719 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationFailureException.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.util.FlinkException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Exception that signals the failure of an + * application with a given {@link ApplicationStatus}. + */ +@Internal +public class ApplicationFailureException extends FlinkException { + + private final JobID jobID; + + private final ApplicationStatus status; + + public ApplicationFailureException( + final JobID jobID, + final ApplicationStatus status, + final String message, + final Throwable cause) { + super(message, cause); + this.jobID = jobID; + this.status = checkNotNull(status); + } + + public JobID getJobID() { + return jobID; + } + + public ApplicationStatus getStatus() { + return status; + } + + public static ApplicationFailureException fromJobResult( + final JobResult result, + final ClassLoader userClassLoader) { + + checkState(result != null && !result.isSuccess()); + checkNotNull(userClassLoader); + + final JobID jobID = result.getJobId(); + final ApplicationStatus status = result.getApplicationStatus(); + + final Throwable throwable = result + .getSerializedThrowable() + .map(ser -> ser.deserializeError(userClassLoader)) + .orElse(new FlinkException("Unknown reason.")); + + if (status == ApplicationStatus.CANCELED || status == ApplicationStatus.FAILED) { + return new ApplicationFailureException( + jobID, status, "Application Status: " + status.name(), throwable); + } + + return new ApplicationFailureException( + jobID, ApplicationStatus.UNKNOWN, "Job failed for unknown reason.", throwable); + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java index 3b7aa2e..0d0fc03 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; @@ -53,6 +54,7 @@ import java.net.MalformedURLException; import java.util.Collection; import java.util.Collections; import java.util.Optional; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executors; @@ -61,6 +63,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -233,7 +236,17 @@ public class ApplicationDispatcherBootstrapTest { final CompletableFuture<Void> applicationFuture = runApplication(dispatcherBuilder, 2); - assertException(applicationFuture, JobExecutionException.class); + try { + applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + fail("Expected exception but the execution went smoothly."); + } catch (Throwable e) { + Optional<ApplicationFailureException> expectedException = + ExceptionUtils.findThrowable(e, ApplicationFailureException.class); + if (!expectedException.isPresent()) { + throw e; + } + assertEquals(expectedException.get().getStatus(), ApplicationStatus.FAILED); + } } @Test @@ -301,7 +314,11 @@ public class ApplicationDispatcherBootstrapTest { .setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())) .setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING)); - ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3); + // we're "listening" on this to be completed to verify that the error handler is called. + // In production, this will shut down the cluster with an exception. + final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>(); + final ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap( + 3, errorHandlerFuture::completeExceptionally); final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor); @@ -310,66 +327,61 @@ public class ApplicationDispatcherBootstrapTest { bootstrap.stop(); - // wait until the bootstrap "thinks" it's done - shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + // we call the error handler + assertException(errorHandlerFuture, CancellationException.class); + + // we return a future that is completed exceptionally + assertException(shutdownFuture, CancellationException.class); // verify that the application task is being cancelled assertThat(applicationExecutionFuture.isCancelled(), is(true)); } @Test - public void testClusterShutdownWhenStoppingBootstrap() throws Exception { - // we're "listening" on this to be completed to verify that the cluster - // is being shut down from the ApplicationDispatcherBootstrap - final CompletableFuture<ApplicationStatus> externalShutdownFuture = new CompletableFuture<>(); - + public void testErrorHandlerIsCalledWhenStoppingBootstrap() throws Exception { final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder() .setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())) - .setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING)) - .setClusterShutdownFunction((status) -> { - externalShutdownFuture.complete(status); - return CompletableFuture.completedFuture(Acknowledge.get()); - }); + .setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING)); - ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(2); + // we're "listening" on this to be completed to verify that the error handler is called. + // In production, this will shut down the cluster with an exception. + final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>(); + final ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap( + 2, errorHandlerFuture::completeExceptionally); final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor); bootstrap.stop(); - // wait until the bootstrap "thinks" it's done - shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + // we call the error handler + assertException(errorHandlerFuture, CancellationException.class); - // verify that the dispatcher is actually being shut down - assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(ApplicationStatus.UNKNOWN)); + // we return a future that is completed exceptionally + assertException(shutdownFuture, CancellationException.class); } @Test - public void testClusterShutdownWhenSubmissionFails() throws Exception { - // we're "listening" on this to be completed to verify that the cluster - // is being shut down from the ApplicationDispatcherBootstrap - final CompletableFuture<ApplicationStatus> externalShutdownFuture = new CompletableFuture<>(); - + public void testErrorHandlerIsCalledWhenSubmissionFails() throws Exception { final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder() .setSubmitFunction(jobGraph -> { throw new FlinkRuntimeException("Nope!"); - }) - .setClusterShutdownFunction((status) -> { - externalShutdownFuture.complete(status); - return CompletableFuture.completedFuture(Acknowledge.get()); }); - ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3); + // we're "listening" on this to be completed to verify that the error handler is called. + // In production, this will shut down the cluster with an exception. + final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>(); + final ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap( + 3, errorHandlerFuture::completeExceptionally); final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor); - // wait until the bootstrap "thinks" it's done - shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + // we call the error handler + assertException(errorHandlerFuture, ApplicationExecutionException.class); - // verify that the dispatcher is actually being shut down - assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(ApplicationStatus.FAILED)); + // we return a future that is completed exceptionally + assertException(shutdownFuture, ApplicationExecutionException.class); } @Test @@ -454,7 +466,7 @@ public class ApplicationDispatcherBootstrapTest { final ApplicationDispatcherBootstrap bootstrap = new ApplicationDispatcherBootstrap( - program, Collections.emptyList(), configuration); + program, Collections.emptyList(), configuration, exception -> {}); return bootstrap.fixJobIdAndRunApplicationAsync( dispatcherBuilder.build(), @@ -462,14 +474,21 @@ public class ApplicationDispatcherBootstrapTest { } private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(int noOfJobs) throws FlinkException { - return createApplicationDispatcherBootstrap(noOfJobs, Collections.emptyList()); + return createApplicationDispatcherBootstrap(noOfJobs, exception -> {}); + } + + private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap( + int noOfJobs, + FatalErrorHandler errorHandler) throws FlinkException { + return createApplicationDispatcherBootstrap(noOfJobs, Collections.emptyList(), errorHandler); } private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap( int noOfJobs, - Collection<JobGraph> recoveredJobGraphs) throws FlinkException { + Collection<JobGraph> recoveredJobGraphs, + FatalErrorHandler errorHandler) throws FlinkException { final PackagedProgram program = getProgram(noOfJobs); - return new ApplicationDispatcherBootstrap(program, recoveredJobGraphs, getConfiguration()); + return new ApplicationDispatcherBootstrap(program, recoveredJobGraphs, getConfiguration(), errorHandler); } private PackagedProgram getProgram(int noOfJobs) throws FlinkException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index c4f1670..013d22c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -120,7 +120,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher private final Map<JobID, CompletableFuture<JobManagerRunner>> jobManagerRunnerFutures; - private final DispatcherBootstrap dispatcherBootstrap; + private final Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory; private final ArchivedExecutionGraphStore archivedExecutionGraphStore; @@ -137,10 +137,12 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher protected final CompletableFuture<ApplicationStatus> shutDownFuture; + private DispatcherBootstrap dispatcherBootstrap; + public Dispatcher( RpcService rpcService, DispatcherId fencingToken, - DispatcherBootstrap dispatcherBootstrap, + Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception { super(rpcService, AkkaRpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken); checkNotNull(dispatcherServices); @@ -174,7 +176,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher this.shutDownFuture = new CompletableFuture<>(); - this.dispatcherBootstrap = checkNotNull(dispatcherBootstrap); + this.dispatcherBootstrapFactory = checkNotNull(dispatcherBootstrapFactory); } //------------------------------------------------------ @@ -199,7 +201,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher throw exception; } - dispatcherBootstrap.initialize(this, this.getRpcService().getScheduledExecutor()); + this.dispatcherBootstrap = this.dispatcherBootstrapFactory.apply(shutDownFuture::completeExceptionally); + this.dispatcherBootstrap.initialize(this, this.getRpcService().getScheduledExecutor()); } private void startDispatcherServices() throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java index 71376c2..e1f3827 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java @@ -18,8 +18,11 @@ package org.apache.flink.runtime.dispatcher; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import java.util.function.Function; + /** * {@link Dispatcher} factory interface. */ @@ -29,8 +32,8 @@ public interface DispatcherFactory { * Create a {@link Dispatcher}. */ Dispatcher createDispatcher( - RpcService rpcService, - DispatcherId fencingToken, - DispatcherBootstrap dispatcherBootstrap, - PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception; + RpcService rpcService, + DispatcherId fencingToken, + Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, + PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java index cc17177..a68da40 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java @@ -20,8 +20,11 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import java.util.function.Function; + import static org.apache.flink.runtime.entrypoint.ClusterEntrypoint.EXECUTION_MODE; /** @@ -34,7 +37,7 @@ public enum JobDispatcherFactory implements DispatcherFactory { public MiniDispatcher createDispatcher( RpcService rpcService, DispatcherId fencingToken, - DispatcherBootstrap dispatcherBootstrap, + Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception { final Configuration configuration = partialDispatcherServicesWithJobGraphStore.getConfiguration(); final String executionModeValue = configuration.getString(EXECUTION_MODE); @@ -44,7 +47,7 @@ public enum JobDispatcherFactory implements DispatcherFactory { rpcService, fencingToken, DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, DefaultJobManagerRunnerFactory.INSTANCE), - dispatcherBootstrap, + dispatcherBootstrapFactory, executionMode); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java index 1464b3c..79872ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.FlinkException; @@ -34,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -55,12 +57,12 @@ public class MiniDispatcher extends Dispatcher { RpcService rpcService, DispatcherId fencingToken, DispatcherServices dispatcherServices, - DispatcherBootstrap dispatcherBootstrap, + Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, JobClusterEntrypoint.ExecutionMode executionMode) throws Exception { super( rpcService, fencingToken, - dispatcherBootstrap, + dispatcherBootstrapFactory, dispatcherServices); this.executionMode = checkNotNull(executionMode); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java index cc867ca..35f8d32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java @@ -18,8 +18,11 @@ package org.apache.flink.runtime.dispatcher; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import java.util.function.Function; + /** * {@link DispatcherFactory} which creates a {@link StandaloneDispatcher}. */ @@ -30,13 +33,13 @@ public enum SessionDispatcherFactory implements DispatcherFactory { public StandaloneDispatcher createDispatcher( RpcService rpcService, DispatcherId fencingToken, - DispatcherBootstrap dispatcherBootstrap, + Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception { // create the default dispatcher return new StandaloneDispatcher( rpcService, fencingToken, - dispatcherBootstrap, + dispatcherBootstrapFactory, DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, DefaultJobManagerRunnerFactory.INSTANCE)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java index 4ba50c5..65bd4a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java @@ -20,8 +20,11 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import java.util.function.Function; + /** * Dispatcher implementation which spawns a {@link JobMaster} for each * submitted {@link JobGraph} within in the same process. This dispatcher @@ -31,12 +34,12 @@ public class StandaloneDispatcher extends Dispatcher { public StandaloneDispatcher( RpcService rpcService, DispatcherId fencingToken, - DispatcherBootstrap dispatcherBootstrap, + Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception { super( rpcService, fencingToken, - dispatcherBootstrap, + dispatcherBootstrapFactory, dispatcherServices); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java index 072d048..e5312d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java @@ -66,7 +66,7 @@ class DefaultDispatcherGatewayServiceFactory implements AbstractDispatcherLeader dispatcher = dispatcherFactory.createDispatcher( rpcService, fencingToken, - bootstrap, + errorHandler -> bootstrap, PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter)); } catch (Exception e) { throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index d2ad995..bc0cad5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -193,7 +193,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { dispatcher = new TestingDispatcher( rpcService, DispatcherId.generate(), - new DefaultDispatcherBootstrap(Collections.emptyList()), + errorHandler -> new DefaultDispatcherBootstrap(Collections.emptyList()), new DispatcherServices( configuration, highAvailabilityServices, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index c04e765..8215bc4 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -99,6 +99,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -203,7 +204,8 @@ public class DispatcherTest extends TestLogger { private class TestingDispatcherBuilder { - private DispatcherBootstrap dispatcherBootstrap = new DefaultDispatcherBootstrap(Collections.emptyList()); + private Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory = + errorHandler -> new DefaultDispatcherBootstrap(Collections.emptyList()); private HeartbeatServices heartbeatServices = DispatcherTest.this.heartbeatServices; @@ -223,8 +225,9 @@ public class DispatcherTest extends TestLogger { return this; } - TestingDispatcherBuilder setDispatcherBootstrap(DispatcherBootstrap dispatcherBootstrap) { - this.dispatcherBootstrap = dispatcherBootstrap; + TestingDispatcherBuilder setDispatcherBootstrapFactory( + Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory) { + this.dispatcherBootstrapFactory = dispatcherBootstrapFactory; return this; } @@ -246,7 +249,7 @@ public class DispatcherTest extends TestLogger { return new TestingDispatcher( rpcService, DispatcherId.generate(), - dispatcherBootstrap, + dispatcherBootstrapFactory, new DispatcherServices( configuration, haServices, @@ -446,7 +449,7 @@ public class DispatcherTest extends TestLogger { final JobGraph failingJobGraph = createFailingJobGraph(testException); dispatcher = new TestingDispatcherBuilder() - .setDispatcherBootstrap(new DefaultDispatcherBootstrap(Collections.singleton(failingJobGraph))) + .setDispatcherBootstrapFactory(errorHandler -> new DefaultDispatcherBootstrap(Collections.singleton(failingJobGraph))) .build(); dispatcher.start(); @@ -598,7 +601,7 @@ public class DispatcherTest extends TestLogger { .build(); dispatcher = new TestingDispatcherBuilder() - .setDispatcherBootstrap(new DefaultDispatcherBootstrap(Collections.singleton(jobGraph))) + .setDispatcherBootstrapFactory(errorHandler -> new DefaultDispatcherBootstrap(Collections.singleton(jobGraph))) .setJobGraphWriter(testingJobGraphStore) .build(); dispatcher.start(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index 2362d65..02dedab 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -251,7 +251,7 @@ public class MiniDispatcherTest extends TestLogger { UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), highAvailabilityServices.getJobGraphStore(), testingJobManagerRunnerFactory), - new DefaultDispatcherBootstrap(Collections.singletonList(jobGraph)), + errorHandler -> new DefaultDispatcherBootstrap(Collections.singletonList(jobGraph)), executionMode); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java index 22e5d74..fe6472a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import javax.annotation.Nonnull; @@ -38,12 +39,12 @@ class TestingDispatcher extends Dispatcher { TestingDispatcher( RpcService rpcService, DispatcherId fencingToken, - DispatcherBootstrap dispatcherBootstrap, + Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception { super( rpcService, fencingToken, - dispatcherBootstrap, + dispatcherBootstrapFactory, dispatcherServices); this.startFuture = new CompletableFuture<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java index bcd1eb5..d31a7f8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.jobmanager.JobGraphStore; import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingRpcServiceResource; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -65,6 +66,7 @@ import java.util.Collection; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; @@ -206,12 +208,12 @@ public class DefaultDispatcherRunnerITCase extends TestLogger { public Dispatcher createDispatcher( RpcService rpcService, DispatcherId fencingToken, - DispatcherBootstrap dispatcherBootstrap, + Function<FatalErrorHandler, DispatcherBootstrap> dispatcherBootstrapFactory, PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception { return new StandaloneDispatcher( rpcService, fencingToken, - dispatcherBootstrap, + dispatcherBootstrapFactory, DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, jobManagerRunnerFactory)); } }
