tillrohrmann commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r579205110
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointScheduling.java ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +/** + * {@code CheckpointScheduling} provides methods for starting and stopping the periodic scheduling. Review comment: ```suggestion * {@code CheckpointScheduling} provides methods for starting and stopping the periodic scheduling of checkpoints. ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * <p>The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl + implements StopWithSavepointTerminationHandler { + + private final Logger log; + + private final SchedulerNG scheduler; + private final CheckpointScheduling checkpointScheduling; + private final JobID jobId; + + private final CompletableFuture<String> result = new CompletableFuture<>(); + + private State state = new WaitingForSavepoint(); + + public <S extends SchedulerNG & CheckpointScheduling> StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, @Nonnull Logger log) { Review comment: The convention in Flink is that everything which is not annotated with `@Nullable` is considered to be non-null: https://flink.apache.org/contributing/code-style-and-quality-common.html#nullability-of-the-mutable-parts. Hence, `@Nonnull` should not be necessary here. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * <p>The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl + implements StopWithSavepointTerminationHandler { + + private final Logger log; + + private final SchedulerNG scheduler; + private final CheckpointScheduling checkpointScheduling; + private final JobID jobId; + + private final CompletableFuture<String> result = new CompletableFuture<>(); + + private State state = new WaitingForSavepoint(); + + public <S extends SchedulerNG & CheckpointScheduling> StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, @Nonnull Logger log) { + this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, log); + } + + @VisibleForTesting + StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, + @Nonnull SchedulerNG scheduler, + @Nonnull CheckpointScheduling checkpointScheduling, + @Nonnull Logger log) { + this.jobId = checkNotNull(jobId); + this.scheduler = checkNotNull(scheduler); + this.checkpointScheduling = checkNotNull(checkpointScheduling); + this.log = checkNotNull(log); + } + + @Override + public CompletableFuture<String> handlesStopWithSavepointTermination( + CompletableFuture<CompletedCheckpoint> completedSavepointFuture, + CompletableFuture<Collection<ExecutionState>> terminatedExecutionsFuture, + ComponentMainThreadExecutor mainThreadExecutor) { + completedSavepointFuture + .whenCompleteAsync( + (completedSavepoint, throwable) -> { + if (throwable != null) { + handleSavepointCreationFailure(throwable); + } else { + handleSavepointCreation(completedSavepoint); + } + }, + mainThreadExecutor) + .thenCompose( + aVoid -> + terminatedExecutionsFuture.thenAcceptAsync( + this::handleExecutionsTermination, mainThreadExecutor)); Review comment: I think this logic does not really belong to this class. If the class states that one first needs to signal the savepoint status and then only the termination status, then it should be good enough. Moreover, by not doing this part in this class, the class does not need to know about the concept of a main thread. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * <p>The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl + implements StopWithSavepointTerminationHandler { + + private final Logger log; + + private final SchedulerNG scheduler; + private final CheckpointScheduling checkpointScheduling; + private final JobID jobId; + + private final CompletableFuture<String> result = new CompletableFuture<>(); + + private State state = new WaitingForSavepoint(); + + public <S extends SchedulerNG & CheckpointScheduling> StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, @Nonnull Logger log) { + this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, log); + } + + @VisibleForTesting + StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, + @Nonnull SchedulerNG scheduler, + @Nonnull CheckpointScheduling checkpointScheduling, + @Nonnull Logger log) { + this.jobId = checkNotNull(jobId); + this.scheduler = checkNotNull(scheduler); + this.checkpointScheduling = checkNotNull(checkpointScheduling); + this.log = checkNotNull(log); + } + + @Override + public CompletableFuture<String> handlesStopWithSavepointTermination( + CompletableFuture<CompletedCheckpoint> completedSavepointFuture, + CompletableFuture<Collection<ExecutionState>> terminatedExecutionsFuture, + ComponentMainThreadExecutor mainThreadExecutor) { + completedSavepointFuture + .whenCompleteAsync( + (completedSavepoint, throwable) -> { + if (throwable != null) { + handleSavepointCreationFailure(throwable); + } else { + handleSavepointCreation(completedSavepoint); + } + }, + mainThreadExecutor) + .thenCompose( + aVoid -> + terminatedExecutionsFuture.thenAcceptAsync( Review comment: Why is `async` here required? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * <p>The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl + implements StopWithSavepointTerminationHandler { + + private final Logger log; + + private final SchedulerNG scheduler; + private final CheckpointScheduling checkpointScheduling; + private final JobID jobId; + + private final CompletableFuture<String> result = new CompletableFuture<>(); + + private State state = new WaitingForSavepoint(); + + public <S extends SchedulerNG & CheckpointScheduling> StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, @Nonnull Logger log) { + this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, log); + } + + @VisibleForTesting + StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, + @Nonnull SchedulerNG scheduler, + @Nonnull CheckpointScheduling checkpointScheduling, + @Nonnull Logger log) { + this.jobId = checkNotNull(jobId); + this.scheduler = checkNotNull(scheduler); + this.checkpointScheduling = checkNotNull(checkpointScheduling); + this.log = checkNotNull(log); + } + + @Override + public CompletableFuture<String> handlesStopWithSavepointTermination( + CompletableFuture<CompletedCheckpoint> completedSavepointFuture, + CompletableFuture<Collection<ExecutionState>> terminatedExecutionsFuture, + ComponentMainThreadExecutor mainThreadExecutor) { + completedSavepointFuture + .whenCompleteAsync( + (completedSavepoint, throwable) -> { + if (throwable != null) { + handleSavepointCreationFailure(throwable); + } else { + handleSavepointCreation(completedSavepoint); + } + }, + mainThreadExecutor) + .thenCompose( + aVoid -> + terminatedExecutionsFuture.thenAcceptAsync( Review comment: Maybe guard this result via `FutureUtils.assertNoException` in order to make sure that `terminatedExecutionsFuture` can never be completed exceptionally. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImplTest.java ########## @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.QuadConsumer; +import org.apache.flink.util.function.TriConsumer; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * {@code StopWithSavepointTerminationHandlerImplTest} tests the stop-with-savepoint functionality + * of {@link SchedulerBase#stopWithSavepoint(String, boolean)}. + */ +public class StopWithSavepointTerminationHandlerImplTest extends TestLogger { + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static final JobID JOB_ID = new JobID(); + + private final TestingCheckpointScheduling checkpointScheduling = + new TestingCheckpointScheduling(false); + + private StopWithSavepointTerminationHandlerImpl createTestInstance( + Consumer<Throwable> handleGlobalFailureConsumer) { + // checkpointing should be always stopped before initiating stop-with-savepoint + checkpointScheduling.stopCheckpointScheduler(); + + final SchedulerNG scheduler = + TestingSchedulerNG.newBuilder() + .setHandleGlobalFailureConsumer(handleGlobalFailureConsumer) + .build(); + return new StopWithSavepointTerminationHandlerImpl( + JOB_ID, scheduler, checkpointScheduling, log); + } + + @Test + public void testHappyPathWithSavepointCreationBeforeSuccessfulTermination() throws Exception { + assertHappyPath( + (completedSavepoint, + completedSavepointFuture, + terminatedExecutionStates, + executionsTerminatedFuture) -> { + completedSavepointFuture.complete(completedSavepoint); + executionsTerminatedFuture.complete(terminatedExecutionStates); + }); + } + + @Test + public void testHappyPathWithSavepointCreationAfterSuccessfulTermination() throws Exception { + assertHappyPath( + (completedSavepoint, + completedSavepointFuture, + terminatedExecutionStates, + executionsTerminatedFuture) -> { + executionsTerminatedFuture.complete(terminatedExecutionStates); + completedSavepointFuture.complete(completedSavepoint); + }); + } + + @Test + public void testSavepointCreationFailureBeforeSuccessfulTermination() { + assertSavepointCreationFailure( + (expectedException, completedSavepointFuture, executionsTerminatedFuture) -> { + completedSavepointFuture.completeExceptionally(expectedException); + executionsTerminatedFuture.complete( + Collections.singletonList(ExecutionState.FINISHED)); + }); + } + + @Test + public void testSavepointCreationFailureAfterSuccessfulTermination() { + assertSavepointCreationFailure( + (expectedException, completedSavepointFuture, executionsTerminatedFuture) -> { + executionsTerminatedFuture.complete( + Collections.singletonList(ExecutionState.FINISHED)); + completedSavepointFuture.completeExceptionally(expectedException); Review comment: If we make the ordering of signals the responsibility of a different component, then we wouldn't have this combinatorial explosion of test cases. Then we would have test cases for the different signal order completions and test cases for the `StopWithSavepointTerminationHandlerImpl` where we can concentrate on the handler specific functionality. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * <p>The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl + implements StopWithSavepointTerminationHandler { + + private final Logger log; + + private final SchedulerNG scheduler; + private final CheckpointScheduling checkpointScheduling; + private final JobID jobId; + + private final CompletableFuture<String> result = new CompletableFuture<>(); + + private State state = new WaitingForSavepoint(); + + public <S extends SchedulerNG & CheckpointScheduling> StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, @Nonnull Logger log) { + this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, log); + } + + @VisibleForTesting + StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, + @Nonnull SchedulerNG scheduler, + @Nonnull CheckpointScheduling checkpointScheduling, + @Nonnull Logger log) { + this.jobId = checkNotNull(jobId); + this.scheduler = checkNotNull(scheduler); + this.checkpointScheduling = checkNotNull(checkpointScheduling); + this.log = checkNotNull(log); + } + + @Override + public CompletableFuture<String> handlesStopWithSavepointTermination( + CompletableFuture<CompletedCheckpoint> completedSavepointFuture, + CompletableFuture<Collection<ExecutionState>> terminatedExecutionsFuture, + ComponentMainThreadExecutor mainThreadExecutor) { + completedSavepointFuture + .whenCompleteAsync( + (completedSavepoint, throwable) -> { + if (throwable != null) { + handleSavepointCreationFailure(throwable); + } else { + handleSavepointCreation(completedSavepoint); + } + }, + mainThreadExecutor) + .thenCompose( + aVoid -> + terminatedExecutionsFuture.thenAcceptAsync( + this::handleExecutionsTermination, mainThreadExecutor)); + + return result; + } + + private synchronized void handleSavepointCreation(CompletedCheckpoint completedCheckpoint) { + final State oldState = state; + state = state.onSavepointCreation(completedCheckpoint); + + log.debug( + "Stop-with-savepoint transitioned from {} to {} on savepoint creation handling for job {}.", + oldState, + state, + jobId); + } + + private synchronized void handleSavepointCreationFailure(Throwable throwable) { + final State oldState = state; + state = state.onSavepointCreationFailure(throwable); + + log.debug( + "Stop-with-savepoint transitioned from {} to {} on savepoint creation failure handling for job {}.", + oldState, + state, + jobId); + } + + private synchronized void handleExecutionsTermination( + Collection<ExecutionState> executionStates) { + final State oldState = state; + state = state.onExecutionsTermination(executionStates); + + log.debug( + "Stop-with-savepoint transitioned from {} to {} on execution termination handling for job {}.", + oldState, + state, + jobId); + } + + /** + * Handles the termination of the {@code StopWithSavepointTerminationHandler} exceptionally + * after triggering a global job fail-over. + * + * @param completedSavepoint the completed savepoint that needs to be discarded. + * @param unfinishedExecutionStates the unfinished states that caused the failure. + */ + private void terminateExceptionallyWithGlobalFailover( + CompletedCheckpoint completedSavepoint, + Iterable<ExecutionState> unfinishedExecutionStates) { + String errorMessage = + String.format( + "Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: %s. A global fail-over is triggered to recover the job %s.", + StringUtils.join(unfinishedExecutionStates, ", "), jobId); + FlinkException inconsistentFinalStateException = new FlinkException(errorMessage); + + scheduler.handleGlobalFailure(inconsistentFinalStateException); + + try { + completedSavepoint.discard(); + } catch (Exception e) { + log.warn( + "Error occurred while cleaning up completed savepoint due to stop-with-savepoint failure.", + e); + inconsistentFinalStateException.addSuppressed(e); + } Review comment: Alternatively, we need to give the `completedSavepoint` to the owner of this class in case of a termination failure. The owner would then be responsible for cleaning the savepoint up. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * <p>The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl + implements StopWithSavepointTerminationHandler { + + private final Logger log; + + private final SchedulerNG scheduler; + private final CheckpointScheduling checkpointScheduling; + private final JobID jobId; + + private final CompletableFuture<String> result = new CompletableFuture<>(); + + private State state = new WaitingForSavepoint(); + + public <S extends SchedulerNG & CheckpointScheduling> StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, @Nonnull Logger log) { + this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, log); + } + + @VisibleForTesting + StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, + @Nonnull SchedulerNG scheduler, + @Nonnull CheckpointScheduling checkpointScheduling, + @Nonnull Logger log) { + this.jobId = checkNotNull(jobId); + this.scheduler = checkNotNull(scheduler); + this.checkpointScheduling = checkNotNull(checkpointScheduling); + this.log = checkNotNull(log); + } + + @Override + public CompletableFuture<String> handlesStopWithSavepointTermination( + CompletableFuture<CompletedCheckpoint> completedSavepointFuture, + CompletableFuture<Collection<ExecutionState>> terminatedExecutionsFuture, + ComponentMainThreadExecutor mainThreadExecutor) { + completedSavepointFuture + .whenCompleteAsync( + (completedSavepoint, throwable) -> { + if (throwable != null) { + handleSavepointCreationFailure(throwable); + } else { + handleSavepointCreation(completedSavepoint); + } + }, + mainThreadExecutor) + .thenCompose( + aVoid -> + terminatedExecutionsFuture.thenAcceptAsync( + this::handleExecutionsTermination, mainThreadExecutor)); + + return result; + } + + private synchronized void handleSavepointCreation(CompletedCheckpoint completedCheckpoint) { + final State oldState = state; + state = state.onSavepointCreation(completedCheckpoint); + + log.debug( + "Stop-with-savepoint transitioned from {} to {} on savepoint creation handling for job {}.", + oldState, + state, + jobId); + } + + private synchronized void handleSavepointCreationFailure(Throwable throwable) { + final State oldState = state; + state = state.onSavepointCreationFailure(throwable); + + log.debug( + "Stop-with-savepoint transitioned from {} to {} on savepoint creation failure handling for job {}.", + oldState, + state, + jobId); + } + + private synchronized void handleExecutionsTermination( + Collection<ExecutionState> executionStates) { + final State oldState = state; + state = state.onExecutionsTermination(executionStates); + + log.debug( + "Stop-with-savepoint transitioned from {} to {} on execution termination handling for job {}.", + oldState, + state, + jobId); + } + + /** + * Handles the termination of the {@code StopWithSavepointTerminationHandler} exceptionally + * after triggering a global job fail-over. + * + * @param completedSavepoint the completed savepoint that needs to be discarded. + * @param unfinishedExecutionStates the unfinished states that caused the failure. + */ + private void terminateExceptionallyWithGlobalFailover( + CompletedCheckpoint completedSavepoint, + Iterable<ExecutionState> unfinishedExecutionStates) { + String errorMessage = + String.format( + "Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: %s. A global fail-over is triggered to recover the job %s.", + StringUtils.join(unfinishedExecutionStates, ", "), jobId); + FlinkException inconsistentFinalStateException = new FlinkException(errorMessage); + + scheduler.handleGlobalFailure(inconsistentFinalStateException); + + try { + completedSavepoint.discard(); + } catch (Exception e) { + log.warn( + "Error occurred while cleaning up completed savepoint due to stop-with-savepoint failure.", + e); + inconsistentFinalStateException.addSuppressed(e); + } + + terminateExceptionally(inconsistentFinalStateException); + } + + /** + * Handles the termination of the {@code StopWithSavepointTerminationHandler} exceptionally + * without triggering a global job fail-over. It does restart the checkpoint scheduling. + * + * @param throwable the error that caused the exceptional termination. + */ + private void terminateExceptionally(Throwable throwable) { + checkpointScheduling.startCheckpointScheduler(); + result.completeExceptionally(throwable); + } + + /** + * Handles the successful termination of the {@code StopWithSavepointTerminationHandler}. + * + * @param path the path where the savepoint is stored. + */ + private void terminateSuccessfully(String path) { + result.complete(path); + } + + private static Set<ExecutionState> extractUnfinishedStates( + Collection<ExecutionState> executionStates) { + return executionStates.stream() + .filter(state -> state != ExecutionState.FINISHED) + .collect(Collectors.toSet()); + } + + private final class WaitingForSavepoint implements State { + + @Override + public State onSavepointCreation(CompletedCheckpoint completedSavepoint) { + return new SavepointCreated(completedSavepoint); + } + + @Override + public State onSavepointCreationFailure(Throwable throwable) { + terminateExceptionally(throwable); + return new FinalState(); + } + } + + private final class SavepointCreated implements State { + + private final CompletedCheckpoint completedSavepoint; + + private SavepointCreated(CompletedCheckpoint completedSavepoint) { + this.completedSavepoint = completedSavepoint; + } + + @Override + public State onExecutionsTermination(Collection<ExecutionState> executionStates) { + final Set<ExecutionState> unfinishedStates = extractUnfinishedStates(executionStates); + + if (unfinishedStates.isEmpty()) { + terminateSuccessfully(completedSavepoint.getExternalPointer()); + return new FinalState(); + } + + terminateExceptionallyWithGlobalFailover(completedSavepoint, unfinishedStates); + return new FinalState(); + } + } + + private final class FinalState implements State { + + @Override + public State onExecutionsTermination(Collection<ExecutionState> executionStates) { + return this; + } + } + + private interface State { + + default State onSavepointCreation(CompletedCheckpoint completedSavepoint) { + throw new UnsupportedOperationException( + this.getClass().getSimpleName() + + " state does not support onSavepointCreation."); + } + + default State onSavepointCreationFailure(Throwable throwable) { + throw new UnsupportedOperationException( + this.getClass().getSimpleName() + + " state does not support onSavepointCreationFailure."); + } + + default State onExecutionsTermination(Collection<ExecutionState> executionStates) { + throw new UnsupportedOperationException( + this.getClass().getSimpleName() + + " state does not support onExecutionsTermination."); + } + } Review comment: This looks simple and good to understand :-) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * <p>The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl + implements StopWithSavepointTerminationHandler { + + private final Logger log; + + private final SchedulerNG scheduler; + private final CheckpointScheduling checkpointScheduling; + private final JobID jobId; + + private final CompletableFuture<String> result = new CompletableFuture<>(); + + private State state = new WaitingForSavepoint(); + + public <S extends SchedulerNG & CheckpointScheduling> StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, @Nonnull Logger log) { + this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, log); + } + + @VisibleForTesting + StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, + @Nonnull SchedulerNG scheduler, + @Nonnull CheckpointScheduling checkpointScheduling, + @Nonnull Logger log) { + this.jobId = checkNotNull(jobId); + this.scheduler = checkNotNull(scheduler); + this.checkpointScheduling = checkNotNull(checkpointScheduling); + this.log = checkNotNull(log); + } + + @Override + public CompletableFuture<String> handlesStopWithSavepointTermination( + CompletableFuture<CompletedCheckpoint> completedSavepointFuture, + CompletableFuture<Collection<ExecutionState>> terminatedExecutionsFuture, + ComponentMainThreadExecutor mainThreadExecutor) { + completedSavepointFuture + .whenCompleteAsync( + (completedSavepoint, throwable) -> { + if (throwable != null) { + handleSavepointCreationFailure(throwable); + } else { + handleSavepointCreation(completedSavepoint); + } + }, + mainThreadExecutor) Review comment: I would suggest to consume an exception via `handleAsync` and then add a `ExceptionUtils.assertNoException` in order to make potential misusages of the state call visible. Otherwise if `handleSavepoint*` fails, then this exception will be swallowed by the returned future. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * <p>The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl + implements StopWithSavepointTerminationHandler { + + private final Logger log; + + private final SchedulerNG scheduler; + private final CheckpointScheduling checkpointScheduling; + private final JobID jobId; + + private final CompletableFuture<String> result = new CompletableFuture<>(); + + private State state = new WaitingForSavepoint(); + + public <S extends SchedulerNG & CheckpointScheduling> StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, @Nonnull Logger log) { + this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, log); + } + + @VisibleForTesting + StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, + @Nonnull SchedulerNG scheduler, + @Nonnull CheckpointScheduling checkpointScheduling, + @Nonnull Logger log) { + this.jobId = checkNotNull(jobId); + this.scheduler = checkNotNull(scheduler); + this.checkpointScheduling = checkNotNull(checkpointScheduling); + this.log = checkNotNull(log); + } + + @Override + public CompletableFuture<String> handlesStopWithSavepointTermination( + CompletableFuture<CompletedCheckpoint> completedSavepointFuture, + CompletableFuture<Collection<ExecutionState>> terminatedExecutionsFuture, + ComponentMainThreadExecutor mainThreadExecutor) { + completedSavepointFuture + .whenCompleteAsync( + (completedSavepoint, throwable) -> { + if (throwable != null) { + handleSavepointCreationFailure(throwable); + } else { + handleSavepointCreation(completedSavepoint); + } + }, + mainThreadExecutor) + .thenCompose( Review comment: `thenRun` should be fine here. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * <p>The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl + implements StopWithSavepointTerminationHandler { + + private final Logger log; + + private final SchedulerNG scheduler; + private final CheckpointScheduling checkpointScheduling; + private final JobID jobId; + + private final CompletableFuture<String> result = new CompletableFuture<>(); + + private State state = new WaitingForSavepoint(); + + public <S extends SchedulerNG & CheckpointScheduling> StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, @Nonnull Logger log) { + this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, log); + } + + @VisibleForTesting + StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, + @Nonnull SchedulerNG scheduler, + @Nonnull CheckpointScheduling checkpointScheduling, + @Nonnull Logger log) { + this.jobId = checkNotNull(jobId); + this.scheduler = checkNotNull(scheduler); + this.checkpointScheduling = checkNotNull(checkpointScheduling); + this.log = checkNotNull(log); + } + + @Override + public CompletableFuture<String> handlesStopWithSavepointTermination( + CompletableFuture<CompletedCheckpoint> completedSavepointFuture, + CompletableFuture<Collection<ExecutionState>> terminatedExecutionsFuture, + ComponentMainThreadExecutor mainThreadExecutor) { + completedSavepointFuture + .whenCompleteAsync( + (completedSavepoint, throwable) -> { + if (throwable != null) { + handleSavepointCreationFailure(throwable); + } else { + handleSavepointCreation(completedSavepoint); + } + }, + mainThreadExecutor) + .thenCompose( + aVoid -> + terminatedExecutionsFuture.thenAcceptAsync( + this::handleExecutionsTermination, mainThreadExecutor)); + + return result; + } + + private synchronized void handleSavepointCreation(CompletedCheckpoint completedCheckpoint) { + final State oldState = state; + state = state.onSavepointCreation(completedCheckpoint); + + log.debug( + "Stop-with-savepoint transitioned from {} to {} on savepoint creation handling for job {}.", + oldState, + state, + jobId); + } + + private synchronized void handleSavepointCreationFailure(Throwable throwable) { + final State oldState = state; + state = state.onSavepointCreationFailure(throwable); + + log.debug( + "Stop-with-savepoint transitioned from {} to {} on savepoint creation failure handling for job {}.", + oldState, + state, + jobId); + } + + private synchronized void handleExecutionsTermination( Review comment: Why are these methods `synchronized`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code StopWithSavepointTerminationHandlerImpl} implements {@link + * StopWithSavepointTerminationHandler}. + * + * <p>The operation only succeeds if both steps, the savepoint creation and the successful + * termination of the job, succeed. If the former step fails, the operation fails exceptionally + * without any further actions. If the latter one fails, a global fail-over is triggered before + * failing the operation. + */ +public class StopWithSavepointTerminationHandlerImpl + implements StopWithSavepointTerminationHandler { + + private final Logger log; + + private final SchedulerNG scheduler; + private final CheckpointScheduling checkpointScheduling; + private final JobID jobId; + + private final CompletableFuture<String> result = new CompletableFuture<>(); + + private State state = new WaitingForSavepoint(); + + public <S extends SchedulerNG & CheckpointScheduling> StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, @Nonnull Logger log) { + this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, log); + } + + @VisibleForTesting + StopWithSavepointTerminationHandlerImpl( + @Nonnull JobID jobId, + @Nonnull SchedulerNG scheduler, + @Nonnull CheckpointScheduling checkpointScheduling, + @Nonnull Logger log) { + this.jobId = checkNotNull(jobId); + this.scheduler = checkNotNull(scheduler); + this.checkpointScheduling = checkNotNull(checkpointScheduling); + this.log = checkNotNull(log); + } + + @Override + public CompletableFuture<String> handlesStopWithSavepointTermination( + CompletableFuture<CompletedCheckpoint> completedSavepointFuture, + CompletableFuture<Collection<ExecutionState>> terminatedExecutionsFuture, + ComponentMainThreadExecutor mainThreadExecutor) { + completedSavepointFuture + .whenCompleteAsync( + (completedSavepoint, throwable) -> { + if (throwable != null) { + handleSavepointCreationFailure(throwable); + } else { + handleSavepointCreation(completedSavepoint); + } + }, + mainThreadExecutor) + .thenCompose( + aVoid -> + terminatedExecutionsFuture.thenAcceptAsync( + this::handleExecutionsTermination, mainThreadExecutor)); + + return result; + } + + private synchronized void handleSavepointCreation(CompletedCheckpoint completedCheckpoint) { + final State oldState = state; + state = state.onSavepointCreation(completedCheckpoint); + + log.debug( + "Stop-with-savepoint transitioned from {} to {} on savepoint creation handling for job {}.", + oldState, + state, + jobId); + } + + private synchronized void handleSavepointCreationFailure(Throwable throwable) { + final State oldState = state; + state = state.onSavepointCreationFailure(throwable); + + log.debug( + "Stop-with-savepoint transitioned from {} to {} on savepoint creation failure handling for job {}.", + oldState, + state, + jobId); + } + + private synchronized void handleExecutionsTermination( + Collection<ExecutionState> executionStates) { + final State oldState = state; + state = state.onExecutionsTermination(executionStates); + + log.debug( + "Stop-with-savepoint transitioned from {} to {} on execution termination handling for job {}.", + oldState, + state, + jobId); + } + + /** + * Handles the termination of the {@code StopWithSavepointTerminationHandler} exceptionally + * after triggering a global job fail-over. + * + * @param completedSavepoint the completed savepoint that needs to be discarded. + * @param unfinishedExecutionStates the unfinished states that caused the failure. + */ + private void terminateExceptionallyWithGlobalFailover( + CompletedCheckpoint completedSavepoint, + Iterable<ExecutionState> unfinishedExecutionStates) { + String errorMessage = + String.format( + "Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: %s. A global fail-over is triggered to recover the job %s.", + StringUtils.join(unfinishedExecutionStates, ", "), jobId); + FlinkException inconsistentFinalStateException = new FlinkException(errorMessage); + + scheduler.handleGlobalFailure(inconsistentFinalStateException); + + try { + completedSavepoint.discard(); + } catch (Exception e) { + log.warn( + "Error occurred while cleaning up completed savepoint due to stop-with-savepoint failure.", + e); + inconsistentFinalStateException.addSuppressed(e); + } Review comment: This can entail IO operations. IO operations should not be executed in the main thread because they can be blocking. If we want to discard the savepoint, then we should pass in an `ioExecutor` which can be used for this. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImplTest.java ########## @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.QuadConsumer; +import org.apache.flink.util.function.TriConsumer; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * {@code StopWithSavepointTerminationHandlerImplTest} tests the stop-with-savepoint functionality + * of {@link SchedulerBase#stopWithSavepoint(String, boolean)}. + */ +public class StopWithSavepointTerminationHandlerImplTest extends TestLogger { + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static final JobID JOB_ID = new JobID(); + + private final TestingCheckpointScheduling checkpointScheduling = + new TestingCheckpointScheduling(false); + + private StopWithSavepointTerminationHandlerImpl createTestInstance( + Consumer<Throwable> handleGlobalFailureConsumer) { + // checkpointing should be always stopped before initiating stop-with-savepoint + checkpointScheduling.stopCheckpointScheduler(); + + final SchedulerNG scheduler = + TestingSchedulerNG.newBuilder() + .setHandleGlobalFailureConsumer(handleGlobalFailureConsumer) + .build(); + return new StopWithSavepointTerminationHandlerImpl( + JOB_ID, scheduler, checkpointScheduling, log); + } + + @Test + public void testHappyPathWithSavepointCreationBeforeSuccessfulTermination() throws Exception { + assertHappyPath( + (completedSavepoint, + completedSavepointFuture, + terminatedExecutionStates, + executionsTerminatedFuture) -> { + completedSavepointFuture.complete(completedSavepoint); + executionsTerminatedFuture.complete(terminatedExecutionStates); + }); + } + + @Test + public void testHappyPathWithSavepointCreationAfterSuccessfulTermination() throws Exception { + assertHappyPath( + (completedSavepoint, + completedSavepointFuture, + terminatedExecutionStates, + executionsTerminatedFuture) -> { + executionsTerminatedFuture.complete(terminatedExecutionStates); + completedSavepointFuture.complete(completedSavepoint); + }); + } + + @Test + public void testSavepointCreationFailureBeforeSuccessfulTermination() { + assertSavepointCreationFailure( + (expectedException, completedSavepointFuture, executionsTerminatedFuture) -> { + completedSavepointFuture.completeExceptionally(expectedException); + executionsTerminatedFuture.complete( + Collections.singletonList(ExecutionState.FINISHED)); + }); + } + + @Test + public void testSavepointCreationFailureAfterSuccessfulTermination() { + assertSavepointCreationFailure( + (expectedException, completedSavepointFuture, executionsTerminatedFuture) -> { + executionsTerminatedFuture.complete( + Collections.singletonList(ExecutionState.FINISHED)); + completedSavepointFuture.completeExceptionally(expectedException); + }); + } + + @Test + public void testSavepointCreationFailureBeforeTaskFailure() { + assertSavepointCreationFailure( + (expectedException, completedSavepointFuture, executionsTerminatedFuture) -> { + completedSavepointFuture.completeExceptionally(expectedException); + executionsTerminatedFuture.complete( + Collections.singletonList(ExecutionState.FAILED)); + }); + } + + @Test + public void testSavepointCreationFailureAfterTaskFailure() { + assertSavepointCreationFailure( + (expectedException, completedSavepointFuture, executionsTerminatedFuture) -> { + executionsTerminatedFuture.complete( + Collections.singletonList(ExecutionState.FAILED)); + completedSavepointFuture.completeExceptionally(expectedException); + }); + } + + @Test + public void testNoTerminationHandlingAfterSavepointCompletion() { + assertNoTerminationHandling( + (completedSavepoint, + completedSavepointFuture, + terminatedExecutionStates, + executionsTerminatedFuture) -> { + completedSavepointFuture.complete(completedSavepoint); + executionsTerminatedFuture.complete(terminatedExecutionStates); + }); + } + + @Test + public void testNoTerminationHandlingBeforeSavepointCompletion() { + assertNoTerminationHandling( + (completedSavepoint, + completedSavepointFuture, + terminatedExecutionStates, + executionsTerminatedFuture) -> { + executionsTerminatedFuture.complete(terminatedExecutionStates); + completedSavepointFuture.complete(completedSavepoint); + }); + } + + private void assertHappyPath( + final QuadConsumer< + CompletedCheckpoint, + CompletableFuture<CompletedCheckpoint>, + Collection<ExecutionState>, + CompletableFuture<Collection<ExecutionState>>> + completion) + throws ExecutionException, InterruptedException { + final StopWithSavepointTerminationHandlerImpl testInstance = + createTestInstance( + throwable -> fail("No global fail-over should have been triggered.")); + + final CompletableFuture<CompletedCheckpoint> completedSavepointFuture = + new CompletableFuture<>(); + final CompletableFuture<Collection<ExecutionState>> executionsTerminatedFuture = + new CompletableFuture<>(); + + final CompletableFuture<String> result = + testInstance.handlesStopWithSavepointTermination( + completedSavepointFuture, + executionsTerminatedFuture, + ComponentMainThreadExecutorServiceAdapter.forMainThread()); + + final String savepointPath = "savepoint-path"; + completion.accept( + createCompletedSavepoint(savepointPath), + completedSavepointFuture, + Collections.singleton(ExecutionState.FINISHED), + executionsTerminatedFuture); + + assertThat(result.get(), is(savepointPath)); + + // the happy path won't restart the checkpoint scheduling + assertFalse("Checkpoint scheduling should be disabled.", checkpointScheduling.isEnabled()); + } + + private void assertSavepointCreationFailure( + final TriConsumer< + Throwable, + CompletableFuture<CompletedCheckpoint>, + CompletableFuture<Collection<ExecutionState>>> + completion) { + final StopWithSavepointTerminationHandlerImpl testInstance = + createTestInstance(throwable -> fail("No global failover should be triggered.")); + + final CompletableFuture<CompletedCheckpoint> completedSavepointFuture = + new CompletableFuture<>(); + final CompletableFuture<Collection<ExecutionState>> executionsTerminatedFuture = + new CompletableFuture<>(); + + final CompletableFuture<String> result = + testInstance.handlesStopWithSavepointTermination( + completedSavepointFuture, + executionsTerminatedFuture, + ComponentMainThreadExecutorServiceAdapter.forMainThread()); + + final String expectedErrorMessage = "Expected exception during savepoint creation."; + completion.accept( + new Exception(expectedErrorMessage), + completedSavepointFuture, + executionsTerminatedFuture); + + try { + result.get(); + fail("An ExecutionException is expected."); + } catch (Throwable e) { + final Optional<Throwable> actualException = + ExceptionUtils.findThrowableWithMessage(e, expectedErrorMessage); + assertTrue( + "An exception with the expected error message should have been thrown.", + actualException.isPresent()); + } + + // the checkpoint scheduling should be enabled in case of failure + assertTrue("Checkpoint scheduling should be enabled.", checkpointScheduling.isEnabled()); + } + + private void assertNoTerminationHandling( + final QuadConsumer< + CompletedCheckpoint, + CompletableFuture<CompletedCheckpoint>, + Collection<ExecutionState>, + CompletableFuture<Collection<ExecutionState>>> + completion) { + final ExecutionState expectedNonFinishedState = ExecutionState.FAILED; + final String expectedErrorMessage = + String.format( + "Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: %s. A global fail-over is triggered to recover the job %s.", + expectedNonFinishedState, JOB_ID); + + final EmptyStreamStateHandle streamStateHandle = new EmptyStreamStateHandle(); + final CompletedCheckpoint completedSavepoint = + createCompletedSavepoint(streamStateHandle, "savepoint-folder"); + + // we have to verify that the handle pointing to the savepoint's metadata is not disposed, + // yet + assertFalse( + "The completed savepoint must not be disposed, yet.", + streamStateHandle.isDisposed()); + + final StopWithSavepointTerminationHandlerImpl testInstance = + createTestInstance( + throwable -> + assertThat( + throwable, + FlinkMatchers.containsMessage(expectedErrorMessage))); + + final CompletableFuture<CompletedCheckpoint> completedSavepointFuture = + new CompletableFuture<>(); + final CompletableFuture<Collection<ExecutionState>> executionsTerminatedFuture = + new CompletableFuture<>(); + + final CompletableFuture<String> result = + testInstance.handlesStopWithSavepointTermination( + completedSavepointFuture, + executionsTerminatedFuture, + ComponentMainThreadExecutorServiceAdapter.forMainThread()); + + completion.accept( + completedSavepoint, + completedSavepointFuture, + Collections.singletonList(expectedNonFinishedState), + executionsTerminatedFuture); + + try { + result.get(); + fail("An ExecutionException is expected."); + } catch (Throwable e) { + final Optional<FlinkException> actualFlinkException = + ExceptionUtils.findThrowable(e, FlinkException.class); + assertTrue( + "A FlinkException should have been thrown.", actualFlinkException.isPresent()); + assertThat( + actualFlinkException.get(), + FlinkMatchers.containsMessage(expectedErrorMessage)); + } + + // the checkpoint scheduling should be enabled in case of failure + assertTrue("Checkpoint scheduling should be enabled.", checkpointScheduling.isEnabled()); + + assertTrue("The savepoint should be cleaned up.", streamStateHandle.isDisposed()); Review comment: How do we assert that we call the global failover? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
