[FLINK-8656] [flip6] Add modify CLI command to rescale Flink jobs

Jobs can now be rescaled by calling flink modify <JOB_ID> -p <PARALLELISM>.
Internally, the CliFrontend will send the corresponding REST call and poll
for status updates.

This closes #5487.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0651876a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0651876a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0651876a

Branch: refs/heads/master
Commit: 0651876ae4d0c790904f60b1728e9009fd4e52a4
Parents: 4e84097
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Feb 13 17:29:32 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Feb 22 17:32:39 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/client/cli/CliFrontend.java    |  59 ++++++++
 .../flink/client/cli/CliFrontendParser.java     |  27 ++++
 .../flink/client/program/ClusterClient.java     |  11 ++
 .../client/program/rest/RestClusterClient.java  |  49 +++++++
 .../flink/client/cli/CliFrontendModifyTest.java | 137 +++++++++++++++++++
 .../util/function/BiConsumerWithException.java  |   4 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  14 +-
 .../async/AsynchronousOperationInfo.java        |   6 +-
 8 files changed, 299 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0651876a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index a849de1..92d2ccb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -84,6 +84,9 @@ import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.FiniteDuration;
 
+import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION;
+import static 
org.apache.flink.client.cli.CliFrontendParser.MODIFY_PARALLELISM_OPTION;
+
 /**
  * Implementation of a simple command line frontend for executing programs.
  */
@@ -98,6 +101,7 @@ public class CliFrontend {
        private static final String ACTION_CANCEL = "cancel";
        private static final String ACTION_STOP = "stop";
        private static final String ACTION_SAVEPOINT = "savepoint";
+       private static final String ACTION_MODIFY = "modify";
 
        // configuration dir parameters
        private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
@@ -714,6 +718,58 @@ public class CliFrontend {
                logAndSysout("Savepoint '" + savepointPath + "' disposed.");
        }
 
+       protected void modify(String[] args) throws CliArgsException, 
FlinkException {
+               LOG.info("Running 'modify' command.");
+
+               final Options commandOptions = 
CliFrontendParser.getModifyOptions();
+
+               final Options commandLineOptions = 
CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);
+
+               final CommandLine commandLine = 
CliFrontendParser.parse(commandLineOptions, args, false);
+
+               if (commandLine.hasOption(HELP_OPTION.getOpt())) {
+                       
CliFrontendParser.printHelpForModify(customCommandLines);
+               }
+
+               final JobID jobId;
+               final String[] modifyArgs = commandLine.getArgs();
+
+               if (modifyArgs.length > 0) {
+                       jobId = parseJobId(modifyArgs[0]);
+               } else {
+                       throw new CliArgsException("Missing JobId");
+               }
+
+               final int newParallelism;
+               if (commandLine.hasOption(MODIFY_PARALLELISM_OPTION.getOpt())) {
+                       try {
+                               newParallelism = 
Integer.parseInt(commandLine.getOptionValue(MODIFY_PARALLELISM_OPTION.getOpt()));
+                       } catch (NumberFormatException e) {
+                               throw new CliArgsException("Could not parse the 
parallelism which is supposed to be an integer.", e);
+                       }
+               } else {
+                       throw new CliArgsException("Missing new parallelism.");
+               }
+
+               final CustomCommandLine<?> activeCommandLine = 
getActiveCustomCommandLine(commandLine);
+
+               logAndSysout("Modify job " + jobId + '.');
+               runClusterAction(
+                       activeCommandLine,
+                       commandLine,
+                       clusterClient -> {
+                               CompletableFuture<Acknowledge> rescaleFuture = 
clusterClient.rescaleJob(jobId, newParallelism);
+
+                               try {
+                                       rescaleFuture.get();
+                               } catch (Exception e) {
+                                       throw new FlinkException("Could not 
rescale job " + jobId + '.', ExceptionUtils.stripExecutionException(e));
+                               }
+                               logAndSysout("Rescaled job " + jobId + ". Its 
new parallelism is " + newParallelism + '.');
+                       }
+               );
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Interaction with programs and JobManager
        // 
--------------------------------------------------------------------------------------------
@@ -977,6 +1033,9 @@ public class CliFrontend {
                                case ACTION_SAVEPOINT:
                                        savepoint(params);
                                        return 0;
+                               case ACTION_MODIFY:
+                                       modify(params);
+                                       return 0;
                                case "-h":
                                case "--help":
                                        
CliFrontendParser.printHelp(customCommandLines);

http://git-wip-us.apache.org/repos/asf/flink/blob/0651876a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index e5d550f..eb68264 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -95,6 +95,8 @@ public class CliFrontendParser {
                        "directory is optional. If no directory is specified, 
the configured default " +
                        "directory (" + 
CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + ") is used.");
 
+       static final Option MODIFY_PARALLELISM_OPTION = new Option("p", 
"parallelism", true, "New parallelism for the specified job.");
+
        static {
                HELP_OPTION.setRequired(false);
 
@@ -134,6 +136,9 @@ public class CliFrontendParser {
                CANCEL_WITH_SAVEPOINT_OPTION.setRequired(false);
                CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory");
                CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true);
+
+               MODIFY_PARALLELISM_OPTION.setRequired(false);
+               MODIFY_PARALLELISM_OPTION.setArgName("newParallelism");
        }
 
        private static final Options RUN_OPTIONS = getRunCommandOptions();
@@ -198,6 +203,12 @@ public class CliFrontendParser {
                return options.addOption(JAR_OPTION);
        }
 
+       static Options getModifyOptions() {
+               final Options options = buildGeneralOptions(new Options());
+               options.addOption(MODIFY_PARALLELISM_OPTION);
+               return options;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Help
        // 
--------------------------------------------------------------------------------------------
@@ -247,6 +258,7 @@ public class CliFrontendParser {
                printHelpForStop(customCommandLines);
                printHelpForCancel(customCommandLines);
                printHelpForSavepoint(customCommandLines);
+               printHelpForModify(customCommandLines);
 
                System.out.println();
        }
@@ -339,6 +351,21 @@ public class CliFrontendParser {
                System.out.println();
        }
 
+       public static void printHelpForModify(Collection<CustomCommandLine<?>> 
customCommandLines) {
+               HelpFormatter formatter = new HelpFormatter();
+               formatter.setLeftPadding(5);
+               formatter.setWidth(80);
+
+               System.out.println("\nAction \"modify\" modifies a running job 
(e.g. change of parallelism).");
+               System.out.println("\n  Syntax: modify <Job ID> [OPTIONS]");
+               formatter.setSyntaxPrefix("  \"modify\" action options:");
+               formatter.printHelp(" ", getModifyOptions());
+
+               printCustomCliOptions(customCommandLines, formatter, false);
+
+               System.out.println();
+       }
+
        /**
         * Prints custom cli options.
         * @param formatter The formatter to use for printing

http://git-wip-us.apache.org/repos/asf/flink/blob/0651876a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 62be234..e2efbac 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -984,4 +984,15 @@ public abstract class ClusterClient<T> {
        protected abstract JobSubmissionResult submitJob(JobGraph jobGraph, 
ClassLoader classLoader)
                throws ProgramInvocationException;
 
+       /**
+        * Rescales the specified job such that it will have the new 
parallelism.
+        *
+        * @param jobId specifying the job to modify
+        * @param newParallelism specifying the new parallelism of the rescaled 
job
+        * @return Future which is completed once the rescaling has been 
completed
+        */
+       public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int 
newParallelism) {
+               throw new UnsupportedOperationException("The " + 
getClass().getSimpleName() + " does not support rescaling.");
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0651876a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 28bd2b5..8ad571f 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -38,8 +38,14 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeaders;
+import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders;
+import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerMessageParameters;
 import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
 import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -401,6 +407,49 @@ public class RestClusterClient<T> extends ClusterClient<T> 
{
                        timeout);
        }
 
+       @Override
+       public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int 
newParallelism) {
+
+               final RescalingTriggerHeaders rescalingTriggerHeaders = 
RescalingTriggerHeaders.getInstance();
+               final RescalingTriggerMessageParameters 
rescalingTriggerMessageParameters = 
rescalingTriggerHeaders.getUnresolvedMessageParameters();
+               
rescalingTriggerMessageParameters.jobPathParameter.resolve(jobId);
+               
rescalingTriggerMessageParameters.rescalingParallelismQueryParameter.resolve(Collections.singletonList(newParallelism));
+
+               final CompletableFuture<TriggerResponse> 
rescalingTriggerResponseFuture = sendRequest(
+                       rescalingTriggerHeaders,
+                       rescalingTriggerMessageParameters,
+                       EmptyRequestBody.getInstance());
+
+               final CompletableFuture<AsynchronousOperationInfo> 
rescalingOperationFuture = rescalingTriggerResponseFuture.thenCompose(
+                       (TriggerResponse triggerResponse) -> {
+                               final TriggerId triggerId = 
triggerResponse.getTriggerId();
+
+                               return pollResourceAsync(
+                                       () -> {
+                                               final RescalingStatusHeaders 
rescalingStatusHeaders = RescalingStatusHeaders.getInstance();
+                                               final 
RescalingStatusMessageParameters rescalingStatusMessageParameters = 
rescalingStatusHeaders.getUnresolvedMessageParameters();
+
+                                               
rescalingStatusMessageParameters.jobPathParameter.resolve(jobId);
+                                               
rescalingStatusMessageParameters.triggerIdPathParameter.resolve(triggerId);
+                                               return sendRetryableRequest(
+                                                       rescalingStatusHeaders,
+                                                       
rescalingStatusMessageParameters,
+                                                       
EmptyRequestBody.getInstance(),
+                                                       
isConnectionProblemException());
+                                       }
+                               );
+                       });
+
+               return rescalingOperationFuture.thenApply(
+                       (AsynchronousOperationInfo asynchronousOperationInfo) 
-> {
+                               if (asynchronousOperationInfo.getFailureCause() 
== null) {
+                                       return Acknowledge.get();
+                               } else {
+                                       throw new 
CompletionException(asynchronousOperationInfo.getFailureCause());
+                               }
+                       });
+       }
+
        /**
         * Creates a {@code CompletableFuture} that polls a {@code 
AsynchronouslyCreatedResource} until
         * its {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} 
becomes

http://git-wip-us.apache.org/repos/asf/flink/blob/0651876a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
new file mode 100644
index 0000000..50d87ba
--- /dev/null
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.util.MockedCliFrontend;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the modify command.
+ */
+public class CliFrontendModifyTest extends TestLogger {
+
+       @Test
+       public void testModifyJob() throws Exception {
+               final JobID jobId = new JobID();
+               final int parallelism = 42;
+               String[] args = {jobId.toString(), "-p", 
String.valueOf(parallelism)};
+
+               Tuple2<JobID, Integer> jobIdParallelism = callModify(args);
+
+               assertThat(jobIdParallelism.f0, Matchers.is(jobId));
+               assertThat(jobIdParallelism.f1, Matchers.is(parallelism));
+       }
+
+       @Test
+       public void testMissingJobId() throws Exception {
+               final int parallelism = 42;
+               final String[] args = {"-p", String.valueOf(parallelism)};
+
+               try {
+                       callModify(args);
+                       fail("Expected CliArgsException");
+               } catch (CliArgsException expected) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testMissingParallelism() throws Exception {
+               final JobID jobId = new JobID();
+               final String[] args = {jobId.toString()};
+
+               try {
+                       callModify(args);
+                       fail("Expected CliArgsException");
+               } catch (CliArgsException expected) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testUnparsableParalllelism() throws Exception {
+               final JobID jobId = new JobID();
+               final String[] args = {jobId.toString(), "-p", "foobar"};
+
+               try {
+                       callModify(args);
+                       fail("Expected CliArgsException");
+               } catch (CliArgsException expected) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testUnparsableJobId() throws Exception {
+               final int parallelism = 42;
+               final String[] args = {"foobar", "-p", 
String.valueOf(parallelism)};
+
+               try {
+                       callModify(args);
+                       fail("Expected CliArgsException");
+               } catch (CliArgsException expected) {
+                       // expected
+               }
+       }
+
+       private Tuple2<JobID, Integer> callModify(String[] args) throws 
Exception {
+               final CompletableFuture<Tuple2<JobID, Integer>> 
rescaleJobFuture = new CompletableFuture<>();
+               final TestingClusterClient clusterClient = new 
TestingClusterClient(rescaleJobFuture);
+               final MockedCliFrontend cliFrontend = new 
MockedCliFrontend(clusterClient);
+
+               cliFrontend.modify(args);
+
+               assertThat(rescaleJobFuture.isDone(), Matchers.is(true));
+
+               return rescaleJobFuture.get();
+       }
+
+       private static final class TestingClusterClient extends 
StandaloneClusterClient {
+
+               private final CompletableFuture<Tuple2<JobID, Integer>> 
rescaleJobFuture;
+
+               public TestingClusterClient(CompletableFuture<Tuple2<JobID, 
Integer>> rescaleJobFuture) throws Exception {
+                       super(new Configuration(), new 
TestingHighAvailabilityServices());
+
+                       this.rescaleJobFuture = rescaleJobFuture;
+               }
+
+               @Override
+               public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, 
int newParallelism) {
+                       rescaleJobFuture.complete(Tuple2.of(jobId, 
newParallelism));
+
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0651876a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
 
b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
index 6ab1161..5864c8a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.util.function;
 
+import org.apache.flink.util.ExceptionUtils;
+
 import java.util.function.BiConsumer;
 
 /**
@@ -44,7 +46,7 @@ public interface BiConsumerWithException<T, U, E extends 
Throwable> extends BiCo
                try {
                        acceptWithException(t, u);
                } catch (Throwable e) {
-                       throw new RuntimeException(e);
+                       ExceptionUtils.rethrow(e);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0651876a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 22c69f5..dd2a7ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -473,11 +473,17 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        int newParallelism,
                        RescalingBehaviour rescalingBehaviour,
                        Time timeout) {
+
+               if (newParallelism <= 0) {
+                       return FutureUtils.completedExceptionally(
+                               new JobModificationException("The target 
parallelism of a rescaling operation must be larger than 0."));
+               }
+
                // 1. Check whether we can rescale the job & rescale the 
respective vertices
                for (JobVertexID jobVertexId : operators) {
                        final JobVertex jobVertex = 
jobGraph.findVertexByID(jobVertexId);
 
-                       // update max parallelism in case that it has not been 
configure
+                       // update max parallelism in case that it has not been 
configured
                        final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
 
                        if (executionJobVertex != null) {
@@ -595,7 +601,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                                        return Acknowledge.get();
                                } else {
-                                       throw new CompletionException(new 
JobModificationException("Detected concurrent modification of ExecutionGraph. 
Aborting the resacling."));
+                                       throw new CompletionException(new 
JobModificationException("Detected concurrent modification of ExecutionGraph. 
Aborting the rescaling."));
                                }
 
                        },
@@ -1145,8 +1151,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                                jobMasterConfiguration.getConfiguration(),
                                userCodeLoader,
                                log);
-               } catch (FlinkException | IOException de) {
-                       log.info("Could not dispose temporary rescaling 
savepoint under {}.", savepointPath, de);
+               } catch (FlinkException | IOException e) {
+                       log.info("Could not dispose temporary rescaling 
savepoint under {}.", savepointPath, e);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0651876a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java
index a46fba9..7d75bf6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java
@@ -43,9 +43,9 @@ public class AsynchronousOperationInfo {
        private final SerializedThrowable failureCause;
 
        private AsynchronousOperationInfo(
-               @JsonProperty(FIELD_NAME_FAILURE_CAUSE)
-               @JsonDeserialize(using = SerializedThrowableDeserializer.class)
-               @Nullable SerializedThrowable failureCause) {
+                       @JsonProperty(FIELD_NAME_FAILURE_CAUSE)
+                       @JsonDeserialize(using = 
SerializedThrowableDeserializer.class)
+                       @Nullable SerializedThrowable failureCause) {
                this.failureCause = failureCause;
        }
 

Reply via email to