[hotfix] Improve code structure of JobMaster#rescaleOperators

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e461208a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e461208a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e461208a

Branch: refs/heads/master
Commit: e461208a9473036b41fb17a737f5ac18cd9c49f7
Parents: 3969170
Author: Till Rohrmann <[email protected]>
Authored: Fri Feb 23 18:51:44 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sat Feb 24 15:05:14 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      | 228 +++++++++++--------
 1 file changed, 134 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e461208a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index ecb2280..bfe8aaa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -468,26 +468,13 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                }
 
                // 1. Check whether we can rescale the job & rescale the 
respective vertices
-               for (JobVertexID jobVertexId : operators) {
-                       final JobVertex jobVertex = 
jobGraph.findVertexByID(jobVertexId);
-
-                       // update max parallelism in case that it has not been 
configured
-                       final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
-
-                       if (executionJobVertex != null) {
-                               
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
-                       }
-
-                       try {
-                               
rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
-                       } catch (FlinkException e) {
-                               final String msg = String.format("Cannot 
rescale job %s.", jobGraph.getName());
-
-                               log.info(msg, e);
+               try {
+                       rescaleJobGraph(operators, newParallelism, 
rescalingBehaviour);
+               } catch (FlinkException e) {
+                       final String msg = String.format("Cannot rescale job 
%s.", jobGraph.getName());
 
-                               return FutureUtils.completedExceptionally(
-                                       new JobModificationException(msg, e));
-                       }
+                       log.info(msg, e);
+                       return FutureUtils.completedExceptionally(new 
JobModificationException(msg, e));
                }
 
                final ExecutionGraph currentExecutionGraph = executionGraph;
@@ -521,82 +508,11 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                checkpointCoordinator.stopCheckpointScheduler();
 
                // 4. take a savepoint
-               final CompletableFuture<String> savepointFuture = 
triggerSavepoint(
-                       null,
-                       timeout)
-                       .handleAsync(
-                               (String savepointPath, Throwable throwable) -> {
-                                       if (throwable != null) {
-                                               final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
-                                               if (strippedThrowable 
instanceof CheckpointTriggerException) {
-                                                       final 
CheckpointTriggerException checkpointTriggerException = 
(CheckpointTriggerException) strippedThrowable;
+               final CompletableFuture<String> savepointFuture = 
getJobModificationSavepoint(timeout);
 
-                                                       if 
(checkpointTriggerException.getCheckpointDeclineReason() == 
CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
-                                                               return 
lastInternalSavepoint;
-                                                       } else {
-                                                               throw new 
CompletionException(checkpointTriggerException);
-                                                       }
-                                               } else {
-                                                       throw new 
CompletionException(strippedThrowable);
-                                               }
-                                       } else {
-                                               final String savepointToDispose 
= lastInternalSavepoint;
-                                               lastInternalSavepoint = 
savepointPath;
-
-                                               if (savepointToDispose != null) 
{
-                                                       // dispose the old 
savepoint asynchronously
-                                                       
CompletableFuture.runAsync(
-                                                               () -> 
disposeSavepoint(savepointToDispose),
-                                                               
scheduledExecutorService);
-                                               }
-
-                                               return lastInternalSavepoint;
-                                       }
-                               },
-                               getMainThreadExecutor());
-
-               final CompletableFuture<ExecutionGraph> executionGraphFuture = 
savepointFuture
-                       .thenApplyAsync(
-                               (@Nullable String savepointPath) -> {
-                                       if (savepointPath != null) {
-                                               try {
-                                                       
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, 
SavepointRestoreSettings.forPath(savepointPath, false));
-                                               } catch (Exception e) {
-                                                       final String message = 
String.format("Could not restore from temporary rescaling savepoint. This might 
indicate " +
-                                                                       "that 
the savepoint %s got corrupted. Deleting this savepoint as a precaution.",
-                                                               savepointPath);
-
-                                                       log.info(message);
-
-                                                       CompletableFuture
-                                                               .runAsync(
-                                                                       () -> {
-                                                                               
if (savepointPath.equals(lastInternalSavepoint)) {
-                                                                               
        lastInternalSavepoint = null;
-                                                                               
}
-                                                                       },
-                                                                       
getMainThreadExecutor())
-                                                               .thenRunAsync(
-                                                                       () -> 
disposeSavepoint(savepointPath),
-                                                                       
scheduledExecutorService);
-
-                                                       throw new 
CompletionException(new JobModificationException(message, e));
-                                               }
-                                       } else {
-                                               try {
-                                                       
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, 
jobGraph.getSavepointRestoreSettings());
-                                               } catch (Exception e) {
-                                                       final String message = 
String.format("Could not restore from initial savepoint. This might indicate " +
-                                                               "that the 
savepoint %s got corrupted.", 
jobGraph.getSavepointRestoreSettings().getRestorePath());
-
-                                                       log.info(message);
-
-                                                       throw new 
CompletionException(new JobModificationException(message, e));
-                                               }
-                                       }
-
-                                       return newExecutionGraph;
-                               }, scheduledExecutorService)
+               final CompletableFuture<ExecutionGraph> executionGraphFuture = 
restoreExecutionGraphFromRescalingSavepoint(
+                       newExecutionGraph,
+                       savepointFuture)
                        .handleAsync(
                                (ExecutionGraph executionGraph, Throwable 
failure) -> {
                                        if (failure != null) {
@@ -1332,6 +1248,130 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                slotPoolGateway.disconnectResourceManager();
        }
 
+       /**
+        * Restore the given {@link ExecutionGraph} from the rescaling 
savepoint. If the {@link ExecutionGraph} could
+        * be restored, then this savepoint will be recorded as the latest 
successful modification savepoint. A previous
+        * savepoint will be disposed. If the rescaling savepoint is empty, the 
job will be restored from the initially
+        * provided savepoint.
+        *
+        * @param newExecutionGraph to restore
+        * @param savepointFuture containing the path to the internal 
modification savepoint
+        * @return Future which is completed with the restored {@link 
ExecutionGraph}
+        */
+       private CompletableFuture<ExecutionGraph> 
restoreExecutionGraphFromRescalingSavepoint(ExecutionGraph newExecutionGraph, 
CompletableFuture<String> savepointFuture) {
+               return savepointFuture
+                       .thenApplyAsync(
+                               (@Nullable String savepointPath) -> {
+                                       if (savepointPath != null) {
+                                               try {
+                                                       
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, 
SavepointRestoreSettings.forPath(savepointPath, false));
+                                               } catch (Exception e) {
+                                                       final String message = 
String.format("Could not restore from temporary rescaling savepoint. This might 
indicate " +
+                                                                       "that 
the savepoint %s got corrupted. Deleting this savepoint as a precaution.",
+                                                               savepointPath);
+
+                                                       log.info(message);
+
+                                                       CompletableFuture
+                                                               .runAsync(
+                                                                       () -> {
+                                                                               
if (savepointPath.equals(lastInternalSavepoint)) {
+                                                                               
        lastInternalSavepoint = null;
+                                                                               
}
+                                                                       },
+                                                                       
getMainThreadExecutor())
+                                                               .thenRunAsync(
+                                                                       () -> 
disposeSavepoint(savepointPath),
+                                                                       
scheduledExecutorService);
+
+                                                       throw new 
CompletionException(new JobModificationException(message, e));
+                                               }
+                                       } else {
+                                               // No rescaling savepoint, 
restart from the initial savepoint or none
+                                               try {
+                                                       
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, 
jobGraph.getSavepointRestoreSettings());
+                                               } catch (Exception e) {
+                                                       final String message = 
String.format("Could not restore from initial savepoint. This might indicate " +
+                                                               "that the 
savepoint %s got corrupted.", 
jobGraph.getSavepointRestoreSettings().getRestorePath());
+
+                                                       log.info(message);
+
+                                                       throw new 
CompletionException(new JobModificationException(message, e));
+                                               }
+                                       }
+
+                                       return newExecutionGraph;
+                               }, scheduledExecutorService);
+       }
+
+       /**
+        * Takes an internal savepoint for job modification purposes. If the 
savepoint was not successful because
+        * not all tasks were running, it returns the last successful 
modification savepoint.
+        *
+        * @param timeout for the operation
+        * @return Future which is completed with the savepoint path or the 
last successful modification savepoint if the
+        * former was not successful
+        */
+       private CompletableFuture<String> getJobModificationSavepoint(Time 
timeout) {
+               return triggerSavepoint(
+                       null,
+                       timeout)
+                       .handleAsync(
+                               (String savepointPath, Throwable throwable) -> {
+                                       if (throwable != null) {
+                                               final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+                                               if (strippedThrowable 
instanceof CheckpointTriggerException) {
+                                                       final 
CheckpointTriggerException checkpointTriggerException = 
(CheckpointTriggerException) strippedThrowable;
+
+                                                       if 
(checkpointTriggerException.getCheckpointDeclineReason() == 
CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
+                                                               return 
lastInternalSavepoint;
+                                                       } else {
+                                                               throw new 
CompletionException(checkpointTriggerException);
+                                                       }
+                                               } else {
+                                                       throw new 
CompletionException(strippedThrowable);
+                                               }
+                                       } else {
+                                               final String savepointToDispose 
= lastInternalSavepoint;
+                                               lastInternalSavepoint = 
savepointPath;
+
+                                               if (savepointToDispose != null) 
{
+                                                       // dispose the old 
savepoint asynchronously
+                                                       
CompletableFuture.runAsync(
+                                                               () -> 
disposeSavepoint(savepointToDispose),
+                                                               
scheduledExecutorService);
+                                               }
+
+                                               return lastInternalSavepoint;
+                                       }
+                               },
+                               getMainThreadExecutor());
+       }
+
+       /**
+        * Rescales the given operators of the {@link JobGraph} of this {@link 
JobMaster} with respect to given
+        * parallelism and {@link RescalingBehaviour}.
+        *
+        * @param operators to rescale
+        * @param newParallelism new parallelism for these operators
+        * @param rescalingBehaviour of the rescaling operation
+        * @throws FlinkException if the {@link JobGraph} could not be rescaled
+        */
+       private void rescaleJobGraph(Collection<JobVertexID> operators, int 
newParallelism, RescalingBehaviour rescalingBehaviour) throws FlinkException {
+               for (JobVertexID jobVertexId : operators) {
+                       final JobVertex jobVertex = 
jobGraph.findVertexByID(jobVertexId);
+
+                       // update max parallelism in case that it has not been 
configured
+                       final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+                       if (executionJobVertex != null) {
+                               
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
+                       }
+
+                       rescalingBehaviour.acceptWithException(jobVertex, 
newParallelism);
+               }
+       }
+
        
//----------------------------------------------------------------------------------------------
        // Utility classes
        
//----------------------------------------------------------------------------------------------

Reply via email to