rmetzger commented on a change in pull request #14879: URL: https://github.com/apache/flink/pull/14879#discussion_r572672740
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java ########## @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.runtime.jobmaster.SerializedInputSplit; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.KvStateHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +/** + * Abstract state class which contains an {@link ExecutionGraph} and the required handlers to + * execute common operations. + */ +abstract class StateWithExecutionGraph implements State { + private final Context context; + + private final ExecutionGraph executionGraph; + + private final ExecutionGraphHandler executionGraphHandler; + + private final OperatorCoordinatorHandler operatorCoordinatorHandler; + + private final KvStateHandler kvStateHandler; + + private final Logger logger; + + StateWithExecutionGraph( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger) { + this.context = context; + this.executionGraph = executionGraph; + this.executionGraphHandler = executionGraphHandler; + this.operatorCoordinatorHandler = operatorCoordinatorHandler; + this.kvStateHandler = new KvStateHandler(executionGraph); + this.logger = logger; + + FutureUtils.assertNoException( + executionGraph + .getTerminationFuture() + .thenAcceptAsync( + jobStatus -> { + context.runIfState(this, () -> onTerminalState(jobStatus)); + }, + context.getMainThreadExecutor())); + } + + ExecutionGraph getExecutionGraph() { + return executionGraph; + } + + OperatorCoordinatorHandler getOperatorCoordinatorHandler() { + return operatorCoordinatorHandler; + } + + ExecutionGraphHandler getExecutionGraphHandler() { + return executionGraphHandler; + } + + @Override + public void onLeave(State newState) { + if (!StateWithExecutionGraph.class.isAssignableFrom(newState.getClass())) { + // we are leaving the StateWithExecutionGraph --> we need to dispose temporary services + operatorCoordinatorHandler.disposeAllOperatorCoordinators(); + } + } + + @Override + public ArchivedExecutionGraph getJob() { + return ArchivedExecutionGraph.createFrom(executionGraph); + } + + @Override + public JobStatus getJobStatus() { + return executionGraph.getState(); + } + + @Override + public void suspend(Throwable cause) { + executionGraph.suspend(cause); + Preconditions.checkState(executionGraph.getState() == JobStatus.SUSPENDED); + context.goToFinished(ArchivedExecutionGraph.createFrom(executionGraph)); + } + + @Override + public Logger getLogger() { + return logger; + } + + void notifyPartitionDataAvailable(ResultPartitionID partitionID) { + executionGraph.notifyPartitionDataAvailable(partitionID); + } + + SerializedInputSplit requestNextInputSplit( + JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException { + return executionGraphHandler.requestNextInputSplit(vertexID, executionAttempt); + } + + ExecutionState requestPartitionState( + IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) + throws PartitionProducerDisposedException { + return executionGraphHandler.requestPartitionState(intermediateResultId, resultPartitionId); + } + + void acknowledgeCheckpoint( + JobID jobID, + ExecutionAttemptID executionAttemptID, + long checkpointId, + CheckpointMetrics checkpointMetrics, + TaskStateSnapshot checkpointState) { + + executionGraphHandler.acknowledgeCheckpoint( + jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState); + } + + void declineCheckpoint(DeclineCheckpoint decline) { + executionGraphHandler.declineCheckpoint(decline); + } + + void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) { + executionGraph.updateAccumulators(accumulatorSnapshot); + } + + KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) + throws FlinkJobNotFoundException, UnknownKvStateLocation { + return kvStateHandler.requestKvStateLocation(jobId, registrationName); + } + + void notifyKvStateRegistered( + JobID jobId, + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName, + KvStateID kvStateId, + InetSocketAddress kvStateServerAddress) + throws FlinkJobNotFoundException { + kvStateHandler.notifyKvStateRegistered( + jobId, + jobVertexId, + keyGroupRange, + registrationName, + kvStateId, + kvStateServerAddress); + } + + void notifyKvStateUnregistered( + JobID jobId, + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName) + throws FlinkJobNotFoundException { + kvStateHandler.notifyKvStateUnregistered( + jobId, jobVertexId, keyGroupRange, registrationName); + } + + CompletableFuture<String> triggerSavepoint(String targetDirectory, boolean cancelJob) { + final CheckpointCoordinator checkpointCoordinator = + executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator == null) { + throw new IllegalStateException( + String.format("Job %s is not a streaming job.", executionGraph.getJobID())); + } else if (targetDirectory == null + && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) { + logger.info( + "Trying to cancel job {} with savepoint, but no savepoint directory configured.", + executionGraph.getJobID()); + + throw new IllegalStateException( + "No savepoint directory configured. You can either specify a directory " + + "while cancelling via -s :targetDirectory or configure a cluster-wide " + + "default via key '" + + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + + "'."); + } + + logger.info( + "Triggering {}savepoint for job {}.", + cancelJob ? "cancel-with-" : "", + executionGraph.getJobID()); + + if (cancelJob) { + checkpointCoordinator.stopCheckpointScheduler(); + } + + return checkpointCoordinator + .triggerSavepoint(targetDirectory) + .thenApply(CompletedCheckpoint::getExternalPointer) + .handleAsync( + (path, throwable) -> { + if (throwable != null) { + if (cancelJob && context.isState(this)) { + startCheckpointScheduler(checkpointCoordinator); + } + throw new CompletionException(throwable); + } else if (cancelJob && context.isState(this)) { + logger.info( + "Savepoint stored in {}. Now cancelling {}.", + path, + executionGraph.getJobID()); + cancel(); + } + return path; + }, + context.getMainThreadExecutor()); + } + + CompletableFuture<String> stopWithSavepoint( + String targetDirectory, boolean advanceToEndOfEventTime) { + final CheckpointCoordinator checkpointCoordinator = + executionGraph.getCheckpointCoordinator(); + + if (checkpointCoordinator == null) { + return FutureUtils.completedExceptionally( + new IllegalStateException( + String.format( + "Job %s is not a streaming job.", executionGraph.getJobID()))); + } + + if (targetDirectory == null + && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) { + logger.info( + "Trying to cancel job {} with savepoint, but no savepoint directory configured.", + executionGraph.getJobID()); + + return FutureUtils.completedExceptionally( + new IllegalStateException( + "No savepoint directory configured. You can either specify a directory " + + "while cancelling via -s :targetDirectory or configure a cluster-wide " + + "default via key '" + + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + + "'.")); + } + + logger.info("Triggering stop-with-savepoint for job {}.", executionGraph.getJobID()); + + // we stop the checkpoint coordinator so that we are guaranteed + // to have only the data of the synchronous savepoint committed. + // in case of failure, and if the job restarts, the coordinator + // will be restarted by the CheckpointCoordinatorDeActivator. + checkpointCoordinator.stopCheckpointScheduler(); + + final CompletableFuture<String> savepointFuture = + checkpointCoordinator + .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) + .thenApply(CompletedCheckpoint::getExternalPointer); + + final CompletableFuture<JobStatus> terminationFuture = + executionGraph + .getTerminationFuture() + .handle( + (jobstatus, throwable) -> { + if (throwable != null) { + logger.info( + "Failed during stopping job {} with a savepoint. Reason: {}", + executionGraph.getJobID(), + throwable.getMessage()); + throw new CompletionException(throwable); + } else if (jobstatus != JobStatus.FINISHED) { + logger.info( + "Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", + executionGraph.getJobID(), + jobstatus); + throw new CompletionException( + new FlinkException( + "Reached state " + + jobstatus + + " instead of FINISHED.")); + } + return jobstatus; + }); + + return savepointFuture + .thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) + .handleAsync( + (path, throwable) -> { + if (throwable != null) { + if (context.isState(this)) { + // restart the checkpoint coordinator if stopWithSavepoint + // failed. + startCheckpointScheduler(checkpointCoordinator); + } + throw new CompletionException(throwable); + } + if (ensureAllExecutionsFinished()) { + Throwable error = + new FlinkException( + "Not all executions were stopped with a savepoint."); + handleGlobalFailure(error); + throw new CompletionException(error); + } + + return path; + }, + context.getMainThreadExecutor()); + } Review comment: Okay. I filed a ticket: https://issues.apache.org/jira/browse/FLINK-21333 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
