[hotfix] Improve code structure of JobMaster#rescaleOperators
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e461208a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e461208a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e461208a Branch: refs/heads/master Commit: e461208a9473036b41fb17a737f5ac18cd9c49f7 Parents: 3969170 Author: Till Rohrmann <[email protected]> Authored: Fri Feb 23 18:51:44 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sat Feb 24 15:05:14 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmaster/JobMaster.java | 228 +++++++++++-------- 1 file changed, 134 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e461208a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index ecb2280..bfe8aaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -468,26 +468,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } // 1. Check whether we can rescale the job & rescale the respective vertices - for (JobVertexID jobVertexId : operators) { - final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId); - - // update max parallelism in case that it has not been configured - final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); - - if (executionJobVertex != null) { - jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism()); - } - - try { - rescalingBehaviour.acceptWithException(jobVertex, newParallelism); - } catch (FlinkException e) { - final String msg = String.format("Cannot rescale job %s.", jobGraph.getName()); - - log.info(msg, e); + try { + rescaleJobGraph(operators, newParallelism, rescalingBehaviour); + } catch (FlinkException e) { + final String msg = String.format("Cannot rescale job %s.", jobGraph.getName()); - return FutureUtils.completedExceptionally( - new JobModificationException(msg, e)); - } + log.info(msg, e); + return FutureUtils.completedExceptionally(new JobModificationException(msg, e)); } final ExecutionGraph currentExecutionGraph = executionGraph; @@ -521,82 +508,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast checkpointCoordinator.stopCheckpointScheduler(); // 4. take a savepoint - final CompletableFuture<String> savepointFuture = triggerSavepoint( - null, - timeout) - .handleAsync( - (String savepointPath, Throwable throwable) -> { - if (throwable != null) { - final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); - if (strippedThrowable instanceof CheckpointTriggerException) { - final CheckpointTriggerException checkpointTriggerException = (CheckpointTriggerException) strippedThrowable; + final CompletableFuture<String> savepointFuture = getJobModificationSavepoint(timeout); - if (checkpointTriggerException.getCheckpointDeclineReason() == CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) { - return lastInternalSavepoint; - } else { - throw new CompletionException(checkpointTriggerException); - } - } else { - throw new CompletionException(strippedThrowable); - } - } else { - final String savepointToDispose = lastInternalSavepoint; - lastInternalSavepoint = savepointPath; - - if (savepointToDispose != null) { - // dispose the old savepoint asynchronously - CompletableFuture.runAsync( - () -> disposeSavepoint(savepointToDispose), - scheduledExecutorService); - } - - return lastInternalSavepoint; - } - }, - getMainThreadExecutor()); - - final CompletableFuture<ExecutionGraph> executionGraphFuture = savepointFuture - .thenApplyAsync( - (@Nullable String savepointPath) -> { - if (savepointPath != null) { - try { - tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, SavepointRestoreSettings.forPath(savepointPath, false)); - } catch (Exception e) { - final String message = String.format("Could not restore from temporary rescaling savepoint. This might indicate " + - "that the savepoint %s got corrupted. Deleting this savepoint as a precaution.", - savepointPath); - - log.info(message); - - CompletableFuture - .runAsync( - () -> { - if (savepointPath.equals(lastInternalSavepoint)) { - lastInternalSavepoint = null; - } - }, - getMainThreadExecutor()) - .thenRunAsync( - () -> disposeSavepoint(savepointPath), - scheduledExecutorService); - - throw new CompletionException(new JobModificationException(message, e)); - } - } else { - try { - tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings()); - } catch (Exception e) { - final String message = String.format("Could not restore from initial savepoint. This might indicate " + - "that the savepoint %s got corrupted.", jobGraph.getSavepointRestoreSettings().getRestorePath()); - - log.info(message); - - throw new CompletionException(new JobModificationException(message, e)); - } - } - - return newExecutionGraph; - }, scheduledExecutorService) + final CompletableFuture<ExecutionGraph> executionGraphFuture = restoreExecutionGraphFromRescalingSavepoint( + newExecutionGraph, + savepointFuture) .handleAsync( (ExecutionGraph executionGraph, Throwable failure) -> { if (failure != null) { @@ -1332,6 +1248,130 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast slotPoolGateway.disconnectResourceManager(); } + /** + * Restore the given {@link ExecutionGraph} from the rescaling savepoint. If the {@link ExecutionGraph} could + * be restored, then this savepoint will be recorded as the latest successful modification savepoint. A previous + * savepoint will be disposed. If the rescaling savepoint is empty, the job will be restored from the initially + * provided savepoint. + * + * @param newExecutionGraph to restore + * @param savepointFuture containing the path to the internal modification savepoint + * @return Future which is completed with the restored {@link ExecutionGraph} + */ + private CompletableFuture<ExecutionGraph> restoreExecutionGraphFromRescalingSavepoint(ExecutionGraph newExecutionGraph, CompletableFuture<String> savepointFuture) { + return savepointFuture + .thenApplyAsync( + (@Nullable String savepointPath) -> { + if (savepointPath != null) { + try { + tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, SavepointRestoreSettings.forPath(savepointPath, false)); + } catch (Exception e) { + final String message = String.format("Could not restore from temporary rescaling savepoint. This might indicate " + + "that the savepoint %s got corrupted. Deleting this savepoint as a precaution.", + savepointPath); + + log.info(message); + + CompletableFuture + .runAsync( + () -> { + if (savepointPath.equals(lastInternalSavepoint)) { + lastInternalSavepoint = null; + } + }, + getMainThreadExecutor()) + .thenRunAsync( + () -> disposeSavepoint(savepointPath), + scheduledExecutorService); + + throw new CompletionException(new JobModificationException(message, e)); + } + } else { + // No rescaling savepoint, restart from the initial savepoint or none + try { + tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings()); + } catch (Exception e) { + final String message = String.format("Could not restore from initial savepoint. This might indicate " + + "that the savepoint %s got corrupted.", jobGraph.getSavepointRestoreSettings().getRestorePath()); + + log.info(message); + + throw new CompletionException(new JobModificationException(message, e)); + } + } + + return newExecutionGraph; + }, scheduledExecutorService); + } + + /** + * Takes an internal savepoint for job modification purposes. If the savepoint was not successful because + * not all tasks were running, it returns the last successful modification savepoint. + * + * @param timeout for the operation + * @return Future which is completed with the savepoint path or the last successful modification savepoint if the + * former was not successful + */ + private CompletableFuture<String> getJobModificationSavepoint(Time timeout) { + return triggerSavepoint( + null, + timeout) + .handleAsync( + (String savepointPath, Throwable throwable) -> { + if (throwable != null) { + final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); + if (strippedThrowable instanceof CheckpointTriggerException) { + final CheckpointTriggerException checkpointTriggerException = (CheckpointTriggerException) strippedThrowable; + + if (checkpointTriggerException.getCheckpointDeclineReason() == CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) { + return lastInternalSavepoint; + } else { + throw new CompletionException(checkpointTriggerException); + } + } else { + throw new CompletionException(strippedThrowable); + } + } else { + final String savepointToDispose = lastInternalSavepoint; + lastInternalSavepoint = savepointPath; + + if (savepointToDispose != null) { + // dispose the old savepoint asynchronously + CompletableFuture.runAsync( + () -> disposeSavepoint(savepointToDispose), + scheduledExecutorService); + } + + return lastInternalSavepoint; + } + }, + getMainThreadExecutor()); + } + + /** + * Rescales the given operators of the {@link JobGraph} of this {@link JobMaster} with respect to given + * parallelism and {@link RescalingBehaviour}. + * + * @param operators to rescale + * @param newParallelism new parallelism for these operators + * @param rescalingBehaviour of the rescaling operation + * @throws FlinkException if the {@link JobGraph} could not be rescaled + */ + private void rescaleJobGraph(Collection<JobVertexID> operators, int newParallelism, RescalingBehaviour rescalingBehaviour) throws FlinkException { + for (JobVertexID jobVertexId : operators) { + final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId); + + // update max parallelism in case that it has not been configured + final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + if (executionJobVertex != null) { + jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism()); + } + + rescalingBehaviour.acceptWithException(jobVertex, newParallelism); + } + } + //---------------------------------------------------------------------------------------------- // Utility classes //----------------------------------------------------------------------------------------------
