dmvk commented on a change in pull request #16535: URL: https://github.com/apache/flink/pull/16535#discussion_r683322034
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java ########## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphWriter; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TimeUtils; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; + +/** Abstract test for the {@link Dispatcher} component. */ +public class AbstractDispatcherTest extends TestLogger { + + static TestingRpcService rpcService; + + static final Time TIMEOUT = Time.seconds(10L); Review comment: increased to 1m ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ########## @@ -219,11 +156,19 @@ private TestingDispatcher createAndStartDispatcher( TestingHighAvailabilityServices haServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception { + JobGraphWriter jobGraphWriter; + try { + jobGraphWriter = haServices.getJobGraphStore(); + } catch (IllegalStateException e) { + jobGraphWriter = NoOpJobGraphWriter.INSTANCE; + } Review comment: 👍 ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; +import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.TestingJobGraphStore; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +/** An integration test for various fail-over scenarios of the {@link Dispatcher} component. */ +public class DispatcherFailoverITCase extends AbstractDispatcherTest { + + private static final Time TIMEOUT = Time.seconds(1); + + private final BlockingQueue<RpcEndpoint> toTerminate = new LinkedBlockingQueue<>(); + + @Before + public void setUp() throws Exception { + super.setUp(); + final CompletedCheckpointStore completedCheckpointStore = + new EmbeddedCompletedCheckpointStore(); + haServices.setCheckpointRecoveryFactory( + PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs( + completedCheckpointStore, new StandaloneCheckpointIDCounter())); + } + + @After + public void tearDown() { + while (!toTerminate.isEmpty()) { + final RpcEndpoint endpoint = toTerminate.poll(); + try { + RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT); + } catch (Exception e) { + // Ignore. + } + } + } + + @Test + public void testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed() + throws Exception { + final JobGraph jobGraph = createJobGraph(); + final JobID jobId = jobGraph.getJobID(); + + // Construct job graph store. + final Error jobGraphRemovalError = new Error("Unable to remove job graph."); + final TestingJobGraphStore jobGraphStore = + TestingJobGraphStore.newBuilder() + .setRemoveJobGraphConsumer( + graph -> { + throw jobGraphRemovalError; + }) + .build(); + jobGraphStore.start(null); + haServices.setJobGraphStore(jobGraphStore); + + // Construct leader election service. + final TestingLeaderElectionService leaderElectionService = + new TestingLeaderElectionService(); + haServices.setJobMasterLeaderElectionService(jobId, leaderElectionService); + + // Start the first dispatcher and submit the job. + final CountDownLatch jobGraphRemovalErrorReceived = new CountDownLatch(1); + final Dispatcher dispatcher = + createRecoveredDispatcher( + throwable -> { + final Optional<Error> maybeError = + ExceptionUtils.findThrowable(throwable, Error.class); + if (maybeError.isPresent() + && jobGraphRemovalError.equals(maybeError.get())) { + jobGraphRemovalErrorReceived.countDown(); + } else { + testingFatalErrorHandlerResource + .getFatalErrorHandler() + .onFatalError(throwable); + } + }); + toTerminate.add(dispatcher); + leaderElectionService.isLeader(UUID.randomUUID()); + final DispatcherGateway dispatcherGateway = + dispatcher.getSelfGateway(DispatcherGateway.class); + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING); + + // Run vertices, checkpoint and finish. + final JobMasterGateway jobMasterGateway = + connectToLeadingJobMaster(leaderElectionService).get(); + try (final JobMasterTester tester = + new JobMasterTester(rpcService, jobId, jobMasterGateway)) { + final List<TaskDeploymentDescriptor> descriptors = tester.deployVertices(2).get(); + tester.transitionTo(descriptors, ExecutionState.INITIALIZING).get(); + tester.transitionTo(descriptors, ExecutionState.RUNNING).get(); + tester.getCheckpointFuture(1L).get(); + tester.transitionTo(descriptors, ExecutionState.FINISHED).get(); + } + awaitStatus(dispatcherGateway, jobId, JobStatus.FINISHED); + assertTrue(jobGraphRemovalErrorReceived.await(5, TimeUnit.SECONDS)); + + // Remove job master leadership. + leaderElectionService.stop(); + + // Run a second dispatcher, that restores our finished job. + final Dispatcher secondDispatcher = createRecoveredDispatcher(null); + toTerminate.add(secondDispatcher); + final DispatcherGateway secondDispatcherGateway = + secondDispatcher.getSelfGateway(DispatcherGateway.class); + UUID uuid = UUID.randomUUID(); + leaderElectionService.isLeader(uuid); Review comment: I'm not able to fully close the first dispatcher here as the error would be thrown again during job graph removal. I guess this is also closer to a real world scenario, where other services wouldn't be closed at all. I'll try to make the behavior of `leaderElectionService.stop()` bit more explicit with a comment. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; +import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.TestingJobGraphStore; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +/** An integration test for various fail-over scenarios of the {@link Dispatcher} component. */ +public class DispatcherFailoverITCase extends AbstractDispatcherTest { + + private static final Time TIMEOUT = Time.seconds(1); + + private final BlockingQueue<RpcEndpoint> toTerminate = new LinkedBlockingQueue<>(); + + @Before + public void setUp() throws Exception { + super.setUp(); + final CompletedCheckpointStore completedCheckpointStore = + new EmbeddedCompletedCheckpointStore(); + haServices.setCheckpointRecoveryFactory( + PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs( + completedCheckpointStore, new StandaloneCheckpointIDCounter())); + } + + @After + public void tearDown() { + while (!toTerminate.isEmpty()) { + final RpcEndpoint endpoint = toTerminate.poll(); + try { + RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT); + } catch (Exception e) { + // Ignore. + } + } + } + + @Test + public void testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed() + throws Exception { + final JobGraph jobGraph = createJobGraph(); + final JobID jobId = jobGraph.getJobID(); + + // Construct job graph store. + final Error jobGraphRemovalError = new Error("Unable to remove job graph."); + final TestingJobGraphStore jobGraphStore = + TestingJobGraphStore.newBuilder() + .setRemoveJobGraphConsumer( + graph -> { + throw jobGraphRemovalError; + }) + .build(); + jobGraphStore.start(null); + haServices.setJobGraphStore(jobGraphStore); + + // Construct leader election service. + final TestingLeaderElectionService leaderElectionService = + new TestingLeaderElectionService(); + haServices.setJobMasterLeaderElectionService(jobId, leaderElectionService); + + // Start the first dispatcher and submit the job. + final CountDownLatch jobGraphRemovalErrorReceived = new CountDownLatch(1); + final Dispatcher dispatcher = + createRecoveredDispatcher( + throwable -> { + final Optional<Error> maybeError = + ExceptionUtils.findThrowable(throwable, Error.class); + if (maybeError.isPresent() + && jobGraphRemovalError.equals(maybeError.get())) { + jobGraphRemovalErrorReceived.countDown(); + } else { + testingFatalErrorHandlerResource + .getFatalErrorHandler() + .onFatalError(throwable); + } + }); + toTerminate.add(dispatcher); + leaderElectionService.isLeader(UUID.randomUUID()); + final DispatcherGateway dispatcherGateway = + dispatcher.getSelfGateway(DispatcherGateway.class); + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING); + + // Run vertices, checkpoint and finish. + final JobMasterGateway jobMasterGateway = + connectToLeadingJobMaster(leaderElectionService).get(); + try (final JobMasterTester tester = + new JobMasterTester(rpcService, jobId, jobMasterGateway)) { + final List<TaskDeploymentDescriptor> descriptors = tester.deployVertices(2).get(); + tester.transitionTo(descriptors, ExecutionState.INITIALIZING).get(); + tester.transitionTo(descriptors, ExecutionState.RUNNING).get(); + tester.getCheckpointFuture(1L).get(); + tester.transitionTo(descriptors, ExecutionState.FINISHED).get(); + } + awaitStatus(dispatcherGateway, jobId, JobStatus.FINISHED); + assertTrue(jobGraphRemovalErrorReceived.await(5, TimeUnit.SECONDS)); + + // Remove job master leadership. + leaderElectionService.stop(); + + // Run a second dispatcher, that restores our finished job. + final Dispatcher secondDispatcher = createRecoveredDispatcher(null); + toTerminate.add(secondDispatcher); + final DispatcherGateway secondDispatcherGateway = + secondDispatcher.getSelfGateway(DispatcherGateway.class); + UUID uuid = UUID.randomUUID(); + leaderElectionService.isLeader(uuid); Review comment: I've also added call to `leaderElectionService.notLeader();`, so it's clear that we revoke leadership of the first dispatcher. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -743,31 +744,43 @@ private void registerJobManagerRunnerTerminationFuture( private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState cleanupJobState) { final JobManagerRunner job = checkNotNull(runningJobs.remove(jobId)); - - final CompletableFuture<Void> jobTerminationFuture = job.closeAsync(); - - return jobTerminationFuture.thenRunAsync( - () -> cleanUpJobData(jobId, cleanupJobState.cleanupHAData), ioExecutor); + return CompletableFuture.supplyAsync( + () -> cleanUpJobGraph(jobId, cleanupJobState.cleanupHAData), ioExecutor) + .thenCompose( + jobGraphRemoved -> job.closeAsync().thenApply(ignored -> jobGraphRemoved)) + .thenAcceptAsync( + jobGraphRemoved -> + cleanUpRemainingJobData( + jobId, cleanupJobState.cleanupHAData, jobGraphRemoved), + ioExecutor); } - private void cleanUpJobData(JobID jobId, boolean cleanupHA) { - jobManagerMetricGroup.removeJob(jobId); - - boolean jobGraphRemoved = false; + private boolean cleanUpJobGraph(JobID jobId, boolean cleanupHA) { if (cleanupHA) { try { jobGraphWriter.removeJobGraph(jobId); - // only clean up the HA blobs and ha service data for the particular job // if we could remove the job from HA storage - jobGraphRemoved = true; + return true; } catch (Exception e) { log.warn( "Could not properly remove job {} from submitted job graph store.", jobId, e); + return false; } + } + try { + jobGraphWriter.releaseJobGraph(jobId); + } catch (Exception e) { + log.warn("Could not properly release job {} from submitted job graph store.", jobId, e); + } + return false; + } + private void cleanUpRemainingJobData(JobID jobId, boolean cleanupHA, boolean jobGraphRemoved) { Review comment: 👍 I think that's a reasonable trade-off for now (until FLINK-11813 is resolved). If we were not able to remove job graph, we're up for the bigger troubles anyway :) Dispatcher tests seem to pass, so let's see if we introduce any test failure in e2e tests by this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
