[FLINK-8635] [rest] Register rescaling handlers at web endpoint

This closes #5454.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e840979
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e840979
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e840979

Branch: refs/heads/master
Commit: 4e8409796a8fb80293db39d62745fbbcce6447cd
Parents: 06acdc1
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Sun Feb 11 19:50:46 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Feb 22 17:32:38 2018 +0100

----------------------------------------------------------------------
 ...nchronousOperationTriggerMessageHeaders.java |   3 +-
 .../rest/handler/job/RescalingHandlers.java     | 111 ----------------
 .../job/RescalingStatusMessageParameters.java   |  39 ------
 .../job/RescalingTriggerMessageParameters.java  |  40 ------
 .../job/rescaling/RescalingHandlers.java        | 131 +++++++++++++++++++
 .../job/rescaling/RescalingStatusHeaders.java   |  78 +++++++++++
 .../RescalingStatusMessageParameters.java       |  40 ++++++
 .../job/rescaling/RescalingTriggerHeaders.java  |  70 ++++++++++
 .../RescalingTriggerMessageParameters.java      |  40 ++++++
 .../RescaleParallelismQueryParameter.java       |  41 ------
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  22 +++-
 11 files changed, 381 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java
index b317028..5baa5ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java
@@ -28,7 +28,8 @@ import org.apache.flink.runtime.rest.messages.RequestBody;
  * @param <R> type of the request
  * @param <M> type of the message parameters
  */
-public abstract class AsynchronousOperationTriggerMessageHeaders<R extends 
RequestBody, M extends MessageParameters> implements MessageHeaders<R, 
TriggerResponse, M> {
+public abstract class AsynchronousOperationTriggerMessageHeaders<R extends 
RequestBody, M extends MessageParameters>
+       implements MessageHeaders<R, TriggerResponse, M> {
 
        @Override
        public Class<TriggerResponse> getResponseClass() {

http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java
deleted file mode 100644
index 6f8895a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.job;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import 
org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers;
-import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
-import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
-import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.rest.messages.RescaleParallelismQueryParameter;
-import org.apache.flink.runtime.rest.messages.TriggerId;
-import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.SerializedThrowable;
-
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Rest handler to trigger and poll the rescaling of a running job.
- */
-public class RescalingHandlers extends 
AbstractAsynchronousOperationHandlers<AsynchronousJobOperationKey, Acknowledge> 
{
-
-       /**
-        * Handler which triggers the rescaling of the specified job.
-        */
-       public class RescalingTriggerHandler extends 
TriggerHandler<RestfulGateway, EmptyRequestBody, 
RescalingTriggerMessageParameters> {
-
-               protected RescalingTriggerHandler(CompletableFuture<String> 
localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, 
Time timeout, Map<String, String> responseHeaders, 
MessageHeaders<EmptyRequestBody, TriggerResponse, 
RescalingTriggerMessageParameters> messageHeaders) {
-                       super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
-               }
-
-               @Override
-               protected CompletableFuture<Acknowledge> 
triggerOperation(HandlerRequest<EmptyRequestBody, 
RescalingTriggerMessageParameters> request, RestfulGateway gateway) throws 
RestHandlerException {
-                       final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
-                       final List<Integer> queryParameter = 
request.getQueryParameter(RescaleParallelismQueryParameter.class);
-
-                       if (queryParameter.isEmpty()) {
-                               throw new RestHandlerException("No new 
parallelism was specified.", HttpResponseStatus.BAD_REQUEST);
-                       }
-
-                       final int newParallelism = queryParameter.get(0);
-
-                       final CompletableFuture<Acknowledge> rescalingFuture = 
gateway.rescaleJob(jobId, newParallelism, RescalingBehaviour.STRICT, timeout);
-
-                       return rescalingFuture;
-               }
-
-               @Override
-               protected AsynchronousJobOperationKey 
createOperationKey(HandlerRequest<EmptyRequestBody, 
RescalingTriggerMessageParameters> request) {
-                       final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
-                       return AsynchronousJobOperationKey.of(new TriggerId(), 
jobId);
-               }
-       }
-
-       /**
-        * Handler which reports the status of the rescaling operation.
-        */
-       public class RescalingStatusHandler extends 
StatusHandler<RestfulGateway, AsynchronousOperationInfo, 
RescalingStatusMessageParameters> {
-
-               protected RescalingStatusHandler(CompletableFuture<String> 
localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, 
Time timeout, Map<String, String> responseHeaders, 
MessageHeaders<EmptyRequestBody, 
AsynchronousOperationResult<AsynchronousOperationInfo>, 
RescalingStatusMessageParameters> messageHeaders) {
-                       super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
-               }
-
-               @Override
-               protected AsynchronousJobOperationKey 
getOperationKey(HandlerRequest<EmptyRequestBody, 
RescalingStatusMessageParameters> request) {
-                       final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
-                       final TriggerId triggerId = 
request.getPathParameter(TriggerIdPathParameter.class);
-
-                       return AsynchronousJobOperationKey.of(triggerId, jobId);
-               }
-
-               @Override
-               protected AsynchronousOperationInfo 
exceptionalOperationResultResponse(Throwable throwable) {
-                       return 
AsynchronousOperationInfo.completeExceptional(new 
SerializedThrowable(throwable));
-               }
-
-               @Override
-               protected AsynchronousOperationInfo 
operationResultResponse(Acknowledge operationResult) {
-                       return AsynchronousOperationInfo.complete();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java
deleted file mode 100644
index 4821b4f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.job;
-
-import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.MessagePathParameter;
-import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
- * Message headers for the {@link RescalingHandlers.RescalingStatusHandler}.
- */
-public class RescalingStatusMessageParameters extends JobMessageParameters {
-
-       public final TriggerIdPathParameter triggerIdPathParameter = new 
TriggerIdPathParameter();
-
-       @Override
-       public Collection<MessagePathParameter<?>> getPathParameters() {
-               return Arrays.asList(jobPathParameter, triggerIdPathParameter);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java
deleted file mode 100644
index 096baa1..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.job;
-
-import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.MessageParameters;
-import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
-import 
org.apache.flink.runtime.rest.messages.RescalingParallelismQueryParameter;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * {@link MessageParameters} for triggering the rescaling of a job.
- */
-public class RescalingTriggerMessageParameters extends JobMessageParameters {
-
-       public final RescalingParallelismQueryParameter 
rescalingParallelismQueryParameter = new RescalingParallelismQueryParameter();
-
-       @Override
-       public Collection<MessageQueryParameter<?>> getQueryParameters() {
-               return 
Collections.singleton(rescalingParallelismQueryParameter);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java
new file mode 100644
index 0000000..3e4ae5a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescaling;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import 
org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.RescalingParallelismQueryParameter;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.SerializedThrowable;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Rest handler to trigger and poll the rescaling of a running job.
+ */
+public class RescalingHandlers extends 
AbstractAsynchronousOperationHandlers<AsynchronousJobOperationKey, Acknowledge> 
{
+
+       /**
+        * Handler which triggers the rescaling of the specified job.
+        */
+       public class RescalingTriggerHandler extends 
TriggerHandler<RestfulGateway, EmptyRequestBody, 
RescalingTriggerMessageParameters> {
+
+               public RescalingTriggerHandler(
+                               CompletableFuture<String> localRestAddress,
+                               GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                               Time timeout,
+                               Map<String, String> responseHeaders) {
+                       super(
+                               localRestAddress,
+                               leaderRetriever,
+                               timeout,
+                               responseHeaders,
+                               RescalingTriggerHeaders.getInstance());
+               }
+
+               @Override
+               protected CompletableFuture<Acknowledge> 
triggerOperation(HandlerRequest<EmptyRequestBody, 
RescalingTriggerMessageParameters> request, RestfulGateway gateway) throws 
RestHandlerException {
+                       final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
+                       final List<Integer> queryParameter = 
request.getQueryParameter(RescalingParallelismQueryParameter.class);
+
+                       if (queryParameter.isEmpty()) {
+                               throw new RestHandlerException("No new 
parallelism was specified.", HttpResponseStatus.BAD_REQUEST);
+                       }
+
+                       final int newParallelism = queryParameter.get(0);
+
+                       final CompletableFuture<Acknowledge> rescalingFuture = 
gateway.rescaleJob(
+                               jobId,
+                               newParallelism,
+                               RescalingBehaviour.STRICT,
+                               timeout);
+
+                       return rescalingFuture;
+               }
+
+               @Override
+               protected AsynchronousJobOperationKey 
createOperationKey(HandlerRequest<EmptyRequestBody, 
RescalingTriggerMessageParameters> request) {
+                       final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
+                       return AsynchronousJobOperationKey.of(new TriggerId(), 
jobId);
+               }
+       }
+
+       /**
+        * Handler which reports the status of the rescaling operation.
+        */
+       public class RescalingStatusHandler extends 
StatusHandler<RestfulGateway, AsynchronousOperationInfo, 
RescalingStatusMessageParameters> {
+
+               public RescalingStatusHandler(
+                       CompletableFuture<String> localRestAddress,
+                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       Time timeout,
+                       Map<String, String> responseHeaders) {
+                       super(
+                               localRestAddress,
+                               leaderRetriever,
+                               timeout,
+                               responseHeaders,
+                               RescalingStatusHeaders.getInstance());
+               }
+
+               @Override
+               protected AsynchronousJobOperationKey 
getOperationKey(HandlerRequest<EmptyRequestBody, 
RescalingStatusMessageParameters> request) {
+                       final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
+                       final TriggerId triggerId = 
request.getPathParameter(TriggerIdPathParameter.class);
+
+                       return AsynchronousJobOperationKey.of(triggerId, jobId);
+               }
+
+               @Override
+               protected AsynchronousOperationInfo 
exceptionalOperationResultResponse(Throwable throwable) {
+                       return 
AsynchronousOperationInfo.completeExceptional(new 
SerializedThrowable(throwable));
+               }
+
+               @Override
+               protected AsynchronousOperationInfo 
operationResultResponse(Acknowledge operationResult) {
+                       return AsynchronousOperationInfo.complete();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusHeaders.java
new file mode 100644
index 0000000..2d5babb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusHeaders.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescaling;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
+import 
org.apache.flink.runtime.rest.handler.async.AsynchronousOperationStatusMessageHeaders;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for polling the status of an ongoing rescaling operation.
+ */
+public class RescalingStatusHeaders extends
+       AsynchronousOperationStatusMessageHeaders<AsynchronousOperationInfo, 
RescalingStatusMessageParameters> {
+
+       private static final RescalingStatusHeaders INSTANCE = new 
RescalingStatusHeaders();
+
+       private static final String URL = String.format(
+               "/jobs/:%s/rescaling/:%s",
+               JobIDPathParameter.KEY,
+               TriggerIdPathParameter.KEY);
+
+       private RescalingStatusHeaders() {}
+
+       @Override
+       protected Class<AsynchronousOperationInfo> getValueClass() {
+               return AsynchronousOperationInfo.class;
+       }
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public RescalingStatusMessageParameters 
getUnresolvedMessageParameters() {
+               return new RescalingStatusMessageParameters();
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.GET;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return URL;
+       }
+
+       public static RescalingStatusHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusMessageParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusMessageParameters.java
new file mode 100644
index 0000000..938a363
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusMessageParameters.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescaling;
+
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * {@link MessageParameters} for polling the status of a rescaling operation.
+ */
+public class RescalingStatusMessageParameters extends JobMessageParameters {
+
+       public final TriggerIdPathParameter triggerIdPathParameter = new 
TriggerIdPathParameter();
+
+       @Override
+       public Collection<MessagePathParameter<?>> getPathParameters() {
+               return Arrays.asList(jobPathParameter, triggerIdPathParameter);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerHeaders.java
new file mode 100644
index 0000000..9567410
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescaling;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.async.AsynchronousOperationTriggerMessageHeaders;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for triggering the rescaling of a job.
+ */
+public class RescalingTriggerHeaders extends
+       AsynchronousOperationTriggerMessageHeaders<EmptyRequestBody, 
RescalingTriggerMessageParameters> {
+
+       private static final RescalingTriggerHeaders INSTANCE = new 
RescalingTriggerHeaders();
+
+       private static final String URL = String.format(
+               "/jobs/:%s/rescaling",
+               JobIDPathParameter.KEY);
+
+       private RescalingTriggerHeaders() {}
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public RescalingTriggerMessageParameters 
getUnresolvedMessageParameters() {
+               return new RescalingTriggerMessageParameters();
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.PATCH;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return URL;
+       }
+
+       public static RescalingTriggerHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerMessageParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerMessageParameters.java
new file mode 100644
index 0000000..4b5d307
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerMessageParameters.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescaling;
+
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import 
org.apache.flink.runtime.rest.messages.RescalingParallelismQueryParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for triggering the rescaling of a job.
+ */
+public class RescalingTriggerMessageParameters extends JobMessageParameters {
+
+       public final RescalingParallelismQueryParameter 
rescalingParallelismQueryParameter = new RescalingParallelismQueryParameter();
+
+       @Override
+       public Collection<MessageQueryParameter<?>> getQueryParameters() {
+               return 
Collections.singleton(rescalingParallelismQueryParameter);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java
deleted file mode 100644
index 8058ab9..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-/**
- * Parallelism for the rescaling of jobs specified as a {@link 
MessageQueryParameter}.
- */
-public class RescaleParallelismQueryParameter extends 
MessageQueryParameter<Integer> {
-
-       public static final String KEY = "parallelism";
-
-       protected RescaleParallelismQueryParameter() {
-               super(KEY, MessageParameterRequisiteness.MANDATORY);
-       }
-
-       @Override
-       public Integer convertValueFromString(String value) {
-               return Integer.valueOf(value);
-       }
-
-       @Override
-       public String convertStringToValue(Integer value) {
-               return value.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 427332f..10a3650 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -61,6 +61,9 @@ import 
org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers;
+import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeaders;
+import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders;
 import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers;
 import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
@@ -448,8 +451,21 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                        SubtaskCurrentAttemptDetailsHeaders.getInstance(),
                        executionGraphCache,
                        executor,
-                       metricFetcher
-               );
+                       metricFetcher);
+
+               final RescalingHandlers rescalingHandlers = new 
RescalingHandlers();
+
+               final RescalingHandlers.RescalingTriggerHandler 
rescalingTriggerHandler = rescalingHandlers.new RescalingTriggerHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders);
+
+               final RescalingHandlers.RescalingStatusHandler 
rescalingStatusHandler = rescalingHandlers.new RescalingStatusHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders);
 
                JobVertexBackPressureHandler jobVertexBackPressureHandler = new 
JobVertexBackPressureHandler(
                        restAddressFuture,
@@ -532,6 +548,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                
handlers.add(Tuple2.of(JobVertexBackPressureHeaders.getInstance(), 
jobVertexBackPressureHandler));
                handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), 
jobCancelTerminationHandler));
                handlers.add(Tuple2.of(JobVertexDetailsHeaders.getInstance(), 
jobVertexDetailsHandler));
+               handlers.add(Tuple2.of(RescalingTriggerHeaders.getInstance(), 
rescalingTriggerHandler));
+               handlers.add(Tuple2.of(RescalingStatusHeaders.getInstance(), 
rescalingStatusHandler));
 
                // TODO: Remove once the Yarn proxy can forward all REST verbs
                
handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), 
jobCancelTerminationHandler));

Reply via email to