[FLINK-8634] [rest] Introduce job rescaling REST handler

Add rescaling REST handler as a sub class of the
AbstractAsynchronousOperationHandlers.

This closes #5451.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06acdc19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06acdc19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06acdc19

Branch: refs/heads/master
Commit: 06acdc1907300862d5faddc4e882f5f6dd670edb
Parents: 4756573
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Fri Feb 2 11:06:35 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Feb 22 17:32:38 2018 +0100

----------------------------------------------------------------------
 .../async/AsynchronousOperationInfo.java        |  64 +++++++++++
 .../job/AsynchronousJobOperationKey.java        |  74 +++++++++++++
 .../rest/handler/job/RescalingHandlers.java     | 111 +++++++++++++++++++
 .../job/RescalingStatusMessageParameters.java   |  39 +++++++
 .../job/RescalingTriggerMessageParameters.java  |  40 +++++++
 .../job/savepoints/SavepointHandlers.java       |  60 +---------
 .../RescaleParallelismQueryParameter.java       |  41 +++++++
 .../RescalingParallelismQueryParameter.java     |  41 +++++++
 8 files changed, 416 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java
new file mode 100644
index 0000000..a46fba9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationInfo.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableDeserializer;
+import 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer;
+import org.apache.flink.util.SerializedThrowable;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+/**
+ * Basic information object for asynchronous operations.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class AsynchronousOperationInfo {
+
+       private static final String FIELD_NAME_FAILURE_CAUSE = "failure-cause";
+
+       @JsonProperty(FIELD_NAME_FAILURE_CAUSE)
+       @JsonSerialize(using = SerializedThrowableSerializer.class)
+       @Nullable
+       private final SerializedThrowable failureCause;
+
+       private AsynchronousOperationInfo(
+               @JsonProperty(FIELD_NAME_FAILURE_CAUSE)
+               @JsonDeserialize(using = SerializedThrowableDeserializer.class)
+               @Nullable SerializedThrowable failureCause) {
+               this.failureCause = failureCause;
+       }
+
+       @Nullable
+       public SerializedThrowable getFailureCause() {
+               return failureCause;
+       }
+
+       public static AsynchronousOperationInfo 
completeExceptional(SerializedThrowable serializedThrowable) {
+               return new AsynchronousOperationInfo(serializedThrowable);
+       }
+
+       public static AsynchronousOperationInfo complete() {
+               return new AsynchronousOperationInfo(null);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
new file mode 100644
index 0000000..4bb473e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import 
org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers;
+import org.apache.flink.runtime.rest.handler.async.OperationKey;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A pair of {@link JobID} and {@link TriggerId} used as a key to a hash based
+ * collection.
+ *
+ * @see AbstractAsynchronousOperationHandlers.CompletedOperationCache
+ */
+@Immutable
+public class AsynchronousJobOperationKey extends OperationKey {
+
+       private final JobID jobId;
+
+       private AsynchronousJobOperationKey(final TriggerId triggerId, final 
JobID jobId) {
+               super(triggerId);
+               this.jobId = requireNonNull(jobId);
+       }
+
+       public static AsynchronousJobOperationKey of(final TriggerId triggerId, 
final JobID jobId) {
+               return new AsynchronousJobOperationKey(triggerId, jobId);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               if (!super.equals(o)) {
+                       return false;
+               }
+
+               AsynchronousJobOperationKey that = 
(AsynchronousJobOperationKey) o;
+               return Objects.equals(jobId, that.jobId);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(super.hashCode(), jobId);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java
new file mode 100644
index 0000000..6f8895a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import 
org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
+import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RescaleParallelismQueryParameter;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.SerializedThrowable;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Rest handler to trigger and poll the rescaling of a running job.
+ */
+public class RescalingHandlers extends 
AbstractAsynchronousOperationHandlers<AsynchronousJobOperationKey, Acknowledge> 
{
+
+       /**
+        * Handler which triggers the rescaling of the specified job.
+        */
+       public class RescalingTriggerHandler extends 
TriggerHandler<RestfulGateway, EmptyRequestBody, 
RescalingTriggerMessageParameters> {
+
+               protected RescalingTriggerHandler(CompletableFuture<String> 
localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, 
Time timeout, Map<String, String> responseHeaders, 
MessageHeaders<EmptyRequestBody, TriggerResponse, 
RescalingTriggerMessageParameters> messageHeaders) {
+                       super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+               }
+
+               @Override
+               protected CompletableFuture<Acknowledge> 
triggerOperation(HandlerRequest<EmptyRequestBody, 
RescalingTriggerMessageParameters> request, RestfulGateway gateway) throws 
RestHandlerException {
+                       final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
+                       final List<Integer> queryParameter = 
request.getQueryParameter(RescaleParallelismQueryParameter.class);
+
+                       if (queryParameter.isEmpty()) {
+                               throw new RestHandlerException("No new 
parallelism was specified.", HttpResponseStatus.BAD_REQUEST);
+                       }
+
+                       final int newParallelism = queryParameter.get(0);
+
+                       final CompletableFuture<Acknowledge> rescalingFuture = 
gateway.rescaleJob(jobId, newParallelism, RescalingBehaviour.STRICT, timeout);
+
+                       return rescalingFuture;
+               }
+
+               @Override
+               protected AsynchronousJobOperationKey 
createOperationKey(HandlerRequest<EmptyRequestBody, 
RescalingTriggerMessageParameters> request) {
+                       final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
+                       return AsynchronousJobOperationKey.of(new TriggerId(), 
jobId);
+               }
+       }
+
+       /**
+        * Handler which reports the status of the rescaling operation.
+        */
+       public class RescalingStatusHandler extends 
StatusHandler<RestfulGateway, AsynchronousOperationInfo, 
RescalingStatusMessageParameters> {
+
+               protected RescalingStatusHandler(CompletableFuture<String> 
localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, 
Time timeout, Map<String, String> responseHeaders, 
MessageHeaders<EmptyRequestBody, 
AsynchronousOperationResult<AsynchronousOperationInfo>, 
RescalingStatusMessageParameters> messageHeaders) {
+                       super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+               }
+
+               @Override
+               protected AsynchronousJobOperationKey 
getOperationKey(HandlerRequest<EmptyRequestBody, 
RescalingStatusMessageParameters> request) {
+                       final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
+                       final TriggerId triggerId = 
request.getPathParameter(TriggerIdPathParameter.class);
+
+                       return AsynchronousJobOperationKey.of(triggerId, jobId);
+               }
+
+               @Override
+               protected AsynchronousOperationInfo 
exceptionalOperationResultResponse(Throwable throwable) {
+                       return 
AsynchronousOperationInfo.completeExceptional(new 
SerializedThrowable(throwable));
+               }
+
+               @Override
+               protected AsynchronousOperationInfo 
operationResultResponse(Acknowledge operationResult) {
+                       return AsynchronousOperationInfo.complete();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java
new file mode 100644
index 0000000..4821b4f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Message headers for the {@link RescalingHandlers.RescalingStatusHandler}.
+ */
+public class RescalingStatusMessageParameters extends JobMessageParameters {
+
+       public final TriggerIdPathParameter triggerIdPathParameter = new 
TriggerIdPathParameter();
+
+       @Override
+       public Collection<MessagePathParameter<?>> getPathParameters() {
+               return Arrays.asList(jobPathParameter, triggerIdPathParameter);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java
new file mode 100644
index 0000000..096baa1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import 
org.apache.flink.runtime.rest.messages.RescalingParallelismQueryParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for triggering the rescaling of a job.
+ */
+public class RescalingTriggerMessageParameters extends JobMessageParameters {
+
+       public final RescalingParallelismQueryParameter 
rescalingParallelismQueryParameter = new RescalingParallelismQueryParameter();
+
+       @Override
+       public Collection<MessageQueryParameter<?>> getQueryParameters() {
+               return 
Collections.singleton(rescalingParallelismQueryParameter);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
index 5915d72..cb3ff5b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import 
org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers;
-import org.apache.flink.runtime.rest.handler.async.OperationKey;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.TriggerId;
@@ -43,14 +43,10 @@ import org.apache.flink.util.SerializedThrowable;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.Immutable;
 
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
-import static java.util.Objects.requireNonNull;
-
 /**
  * HTTP handlers for asynchronous triggering of savepoints.
  *
@@ -96,7 +92,7 @@ import static java.util.Objects.requireNonNull;
  * }
  * </pre>
  */
-public class SavepointHandlers extends 
AbstractAsynchronousOperationHandlers<SavepointHandlers.SavepointKey, String> {
+public class SavepointHandlers extends 
AbstractAsynchronousOperationHandlers<AsynchronousJobOperationKey, String> {
 
        @Nullable
        private final String defaultSavepointDir;
@@ -136,9 +132,9 @@ public class SavepointHandlers extends 
AbstractAsynchronousOperationHandlers<Sav
                }
 
                @Override
-               protected SavepointKey 
createOperationKey(HandlerRequest<SavepointTriggerRequestBody, 
SavepointTriggerMessageParameters> request) {
+               protected AsynchronousJobOperationKey 
createOperationKey(HandlerRequest<SavepointTriggerRequestBody, 
SavepointTriggerMessageParameters> request) {
                        final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
-                       return SavepointKey.of(new TriggerId(), jobId);
+                       return AsynchronousJobOperationKey.of(new TriggerId(), 
jobId);
                }
        }
 
@@ -156,10 +152,10 @@ public class SavepointHandlers extends 
AbstractAsynchronousOperationHandlers<Sav
                }
 
                @Override
-               protected SavepointKey 
getOperationKey(HandlerRequest<EmptyRequestBody, 
SavepointStatusMessageParameters> request) {
+               protected AsynchronousJobOperationKey 
getOperationKey(HandlerRequest<EmptyRequestBody, 
SavepointStatusMessageParameters> request) {
                        final TriggerId triggerId = 
request.getPathParameter(TriggerIdPathParameter.class);
                        final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
-                       return SavepointKey.of(triggerId, jobId);
+                       return AsynchronousJobOperationKey.of(triggerId, jobId);
                }
 
                @Override
@@ -172,48 +168,4 @@ public class SavepointHandlers extends 
AbstractAsynchronousOperationHandlers<Sav
                        return new SavepointInfo(operationResult, null);
                }
        }
-
-       /**
-        * A pair of {@link JobID} and {@link TriggerId} used as a key to a 
hash based
-        * collection.
-        *
-        * @see AbstractAsynchronousOperationHandlers.CompletedOperationCache
-        */
-       @Immutable
-       public static class SavepointKey extends OperationKey {
-
-               private final JobID jobId;
-
-               private SavepointKey(final TriggerId triggerId, final JobID 
jobId) {
-                       super(triggerId);
-                       this.jobId = requireNonNull(jobId);
-               }
-
-               private static SavepointKey of(final TriggerId triggerId, final 
JobID jobId) {
-                       return new SavepointKey(triggerId, jobId);
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-
-                       if (!super.equals(o)) {
-                               return false;
-                       }
-
-                       SavepointKey that = (SavepointKey) o;
-                       return Objects.equals(jobId, that.jobId);
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hash(super.hashCode(), jobId);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java
new file mode 100644
index 0000000..8058ab9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Parallelism for the rescaling of jobs specified as a {@link 
MessageQueryParameter}.
+ */
+public class RescaleParallelismQueryParameter extends 
MessageQueryParameter<Integer> {
+
+       public static final String KEY = "parallelism";
+
+       protected RescaleParallelismQueryParameter() {
+               super(KEY, MessageParameterRequisiteness.MANDATORY);
+       }
+
+       @Override
+       public Integer convertValueFromString(String value) {
+               return Integer.valueOf(value);
+       }
+
+       @Override
+       public String convertStringToValue(Integer value) {
+               return value.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06acdc19/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescalingParallelismQueryParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescalingParallelismQueryParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescalingParallelismQueryParameter.java
new file mode 100644
index 0000000..9230d79
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescalingParallelismQueryParameter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Parallelism for the rescaling of jobs specified as a {@link 
MessageQueryParameter}.
+ */
+public class RescalingParallelismQueryParameter extends 
MessageQueryParameter<Integer> {
+
+       public static final String KEY = "parallelism";
+
+       public RescalingParallelismQueryParameter() {
+               super(KEY, MessageParameterRequisiteness.MANDATORY);
+       }
+
+       @Override
+       public Integer convertValueFromString(String value) {
+               return Integer.valueOf(value);
+       }
+
+       @Override
+       public String convertStringToValue(Integer value) {
+               return value.toString();
+       }
+}

Reply via email to