[FLINK-8635] [rest] Register rescaling handlers at web endpoint This closes #5454.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e840979 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e840979 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e840979 Branch: refs/heads/master Commit: 4e8409796a8fb80293db39d62745fbbcce6447cd Parents: 06acdc1 Author: Till Rohrmann <trohrm...@apache.org> Authored: Sun Feb 11 19:50:46 2018 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Feb 22 17:32:38 2018 +0100 ---------------------------------------------------------------------- ...nchronousOperationTriggerMessageHeaders.java | 3 +- .../rest/handler/job/RescalingHandlers.java | 111 ---------------- .../job/RescalingStatusMessageParameters.java | 39 ------ .../job/RescalingTriggerMessageParameters.java | 40 ------ .../job/rescaling/RescalingHandlers.java | 131 +++++++++++++++++++ .../job/rescaling/RescalingStatusHeaders.java | 78 +++++++++++ .../RescalingStatusMessageParameters.java | 40 ++++++ .../job/rescaling/RescalingTriggerHeaders.java | 70 ++++++++++ .../RescalingTriggerMessageParameters.java | 40 ++++++ .../RescaleParallelismQueryParameter.java | 41 ------ .../runtime/webmonitor/WebMonitorEndpoint.java | 22 +++- 11 files changed, 381 insertions(+), 234 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java index b317028..5baa5ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java @@ -28,7 +28,8 @@ import org.apache.flink.runtime.rest.messages.RequestBody; * @param <R> type of the request * @param <M> type of the message parameters */ -public abstract class AsynchronousOperationTriggerMessageHeaders<R extends RequestBody, M extends MessageParameters> implements MessageHeaders<R, TriggerResponse, M> { +public abstract class AsynchronousOperationTriggerMessageHeaders<R extends RequestBody, M extends MessageParameters> + implements MessageHeaders<R, TriggerResponse, M> { @Override public Class<TriggerResponse> getResponseClass() { http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java deleted file mode 100644 index 6f8895a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingHandlers.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.handler.job; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.jobmaster.RescalingBehaviour; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rest.handler.HandlerRequest; -import org.apache.flink.runtime.rest.handler.RestHandlerException; -import org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers; -import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; -import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; -import org.apache.flink.runtime.rest.handler.async.TriggerResponse; -import org.apache.flink.runtime.rest.messages.EmptyRequestBody; -import org.apache.flink.runtime.rest.messages.JobIDPathParameter; -import org.apache.flink.runtime.rest.messages.MessageHeaders; -import org.apache.flink.runtime.rest.messages.RescaleParallelismQueryParameter; -import org.apache.flink.runtime.rest.messages.TriggerId; -import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; -import org.apache.flink.runtime.webmonitor.RestfulGateway; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.util.SerializedThrowable; - -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; - -/** - * Rest handler to trigger and poll the rescaling of a running job. - */ -public class RescalingHandlers extends AbstractAsynchronousOperationHandlers<AsynchronousJobOperationKey, Acknowledge> { - - /** - * Handler which triggers the rescaling of the specified job. - */ - public class RescalingTriggerHandler extends TriggerHandler<RestfulGateway, EmptyRequestBody, RescalingTriggerMessageParameters> { - - protected RescalingTriggerHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, TriggerResponse, RescalingTriggerMessageParameters> messageHeaders) { - super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); - } - - @Override - protected CompletableFuture<Acknowledge> triggerOperation(HandlerRequest<EmptyRequestBody, RescalingTriggerMessageParameters> request, RestfulGateway gateway) throws RestHandlerException { - final JobID jobId = request.getPathParameter(JobIDPathParameter.class); - final List<Integer> queryParameter = request.getQueryParameter(RescaleParallelismQueryParameter.class); - - if (queryParameter.isEmpty()) { - throw new RestHandlerException("No new parallelism was specified.", HttpResponseStatus.BAD_REQUEST); - } - - final int newParallelism = queryParameter.get(0); - - final CompletableFuture<Acknowledge> rescalingFuture = gateway.rescaleJob(jobId, newParallelism, RescalingBehaviour.STRICT, timeout); - - return rescalingFuture; - } - - @Override - protected AsynchronousJobOperationKey createOperationKey(HandlerRequest<EmptyRequestBody, RescalingTriggerMessageParameters> request) { - final JobID jobId = request.getPathParameter(JobIDPathParameter.class); - return AsynchronousJobOperationKey.of(new TriggerId(), jobId); - } - } - - /** - * Handler which reports the status of the rescaling operation. - */ - public class RescalingStatusHandler extends StatusHandler<RestfulGateway, AsynchronousOperationInfo, RescalingStatusMessageParameters> { - - protected RescalingStatusHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, AsynchronousOperationResult<AsynchronousOperationInfo>, RescalingStatusMessageParameters> messageHeaders) { - super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); - } - - @Override - protected AsynchronousJobOperationKey getOperationKey(HandlerRequest<EmptyRequestBody, RescalingStatusMessageParameters> request) { - final JobID jobId = request.getPathParameter(JobIDPathParameter.class); - final TriggerId triggerId = request.getPathParameter(TriggerIdPathParameter.class); - - return AsynchronousJobOperationKey.of(triggerId, jobId); - } - - @Override - protected AsynchronousOperationInfo exceptionalOperationResultResponse(Throwable throwable) { - return AsynchronousOperationInfo.completeExceptional(new SerializedThrowable(throwable)); - } - - @Override - protected AsynchronousOperationInfo operationResultResponse(Acknowledge operationResult) { - return AsynchronousOperationInfo.complete(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java deleted file mode 100644 index 4821b4f..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingStatusMessageParameters.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.handler.job; - -import org.apache.flink.runtime.rest.messages.JobMessageParameters; -import org.apache.flink.runtime.rest.messages.MessagePathParameter; -import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; - -import java.util.Arrays; -import java.util.Collection; - -/** - * Message headers for the {@link RescalingHandlers.RescalingStatusHandler}. - */ -public class RescalingStatusMessageParameters extends JobMessageParameters { - - public final TriggerIdPathParameter triggerIdPathParameter = new TriggerIdPathParameter(); - - @Override - public Collection<MessagePathParameter<?>> getPathParameters() { - return Arrays.asList(jobPathParameter, triggerIdPathParameter); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java deleted file mode 100644 index 096baa1..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/RescalingTriggerMessageParameters.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.handler.job; - -import org.apache.flink.runtime.rest.messages.JobMessageParameters; -import org.apache.flink.runtime.rest.messages.MessageParameters; -import org.apache.flink.runtime.rest.messages.MessageQueryParameter; -import org.apache.flink.runtime.rest.messages.RescalingParallelismQueryParameter; - -import java.util.Collection; -import java.util.Collections; - -/** - * {@link MessageParameters} for triggering the rescaling of a job. - */ -public class RescalingTriggerMessageParameters extends JobMessageParameters { - - public final RescalingParallelismQueryParameter rescalingParallelismQueryParameter = new RescalingParallelismQueryParameter(); - - @Override - public Collection<MessageQueryParameter<?>> getQueryParameters() { - return Collections.singleton(rescalingParallelismQueryParameter); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java new file mode 100644 index 0000000..3e4ae5a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.rescaling; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobmaster.RescalingBehaviour; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; +import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.RescalingParallelismQueryParameter; +import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Rest handler to trigger and poll the rescaling of a running job. + */ +public class RescalingHandlers extends AbstractAsynchronousOperationHandlers<AsynchronousJobOperationKey, Acknowledge> { + + /** + * Handler which triggers the rescaling of the specified job. + */ + public class RescalingTriggerHandler extends TriggerHandler<RestfulGateway, EmptyRequestBody, RescalingTriggerMessageParameters> { + + public RescalingTriggerHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders) { + super( + localRestAddress, + leaderRetriever, + timeout, + responseHeaders, + RescalingTriggerHeaders.getInstance()); + } + + @Override + protected CompletableFuture<Acknowledge> triggerOperation(HandlerRequest<EmptyRequestBody, RescalingTriggerMessageParameters> request, RestfulGateway gateway) throws RestHandlerException { + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + final List<Integer> queryParameter = request.getQueryParameter(RescalingParallelismQueryParameter.class); + + if (queryParameter.isEmpty()) { + throw new RestHandlerException("No new parallelism was specified.", HttpResponseStatus.BAD_REQUEST); + } + + final int newParallelism = queryParameter.get(0); + + final CompletableFuture<Acknowledge> rescalingFuture = gateway.rescaleJob( + jobId, + newParallelism, + RescalingBehaviour.STRICT, + timeout); + + return rescalingFuture; + } + + @Override + protected AsynchronousJobOperationKey createOperationKey(HandlerRequest<EmptyRequestBody, RescalingTriggerMessageParameters> request) { + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + return AsynchronousJobOperationKey.of(new TriggerId(), jobId); + } + } + + /** + * Handler which reports the status of the rescaling operation. + */ + public class RescalingStatusHandler extends StatusHandler<RestfulGateway, AsynchronousOperationInfo, RescalingStatusMessageParameters> { + + public RescalingStatusHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders) { + super( + localRestAddress, + leaderRetriever, + timeout, + responseHeaders, + RescalingStatusHeaders.getInstance()); + } + + @Override + protected AsynchronousJobOperationKey getOperationKey(HandlerRequest<EmptyRequestBody, RescalingStatusMessageParameters> request) { + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + final TriggerId triggerId = request.getPathParameter(TriggerIdPathParameter.class); + + return AsynchronousJobOperationKey.of(triggerId, jobId); + } + + @Override + protected AsynchronousOperationInfo exceptionalOperationResultResponse(Throwable throwable) { + return AsynchronousOperationInfo.completeExceptional(new SerializedThrowable(throwable)); + } + + @Override + protected AsynchronousOperationInfo operationResultResponse(Acknowledge operationResult) { + return AsynchronousOperationInfo.complete(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusHeaders.java new file mode 100644 index 0000000..2d5babb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusHeaders.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.rescaling; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationStatusMessageHeaders; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for polling the status of an ongoing rescaling operation. + */ +public class RescalingStatusHeaders extends + AsynchronousOperationStatusMessageHeaders<AsynchronousOperationInfo, RescalingStatusMessageParameters> { + + private static final RescalingStatusHeaders INSTANCE = new RescalingStatusHeaders(); + + private static final String URL = String.format( + "/jobs/:%s/rescaling/:%s", + JobIDPathParameter.KEY, + TriggerIdPathParameter.KEY); + + private RescalingStatusHeaders() {} + + @Override + protected Class<AsynchronousOperationInfo> getValueClass() { + return AsynchronousOperationInfo.class; + } + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public RescalingStatusMessageParameters getUnresolvedMessageParameters() { + return new RescalingStatusMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static RescalingStatusHeaders getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusMessageParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusMessageParameters.java new file mode 100644 index 0000000..938a363 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingStatusMessageParameters.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.rescaling; + +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; + +import java.util.Arrays; +import java.util.Collection; + +/** + * {@link MessageParameters} for polling the status of a rescaling operation. + */ +public class RescalingStatusMessageParameters extends JobMessageParameters { + + public final TriggerIdPathParameter triggerIdPathParameter = new TriggerIdPathParameter(); + + @Override + public Collection<MessagePathParameter<?>> getPathParameters() { + return Arrays.asList(jobPathParameter, triggerIdPathParameter); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerHeaders.java new file mode 100644 index 0000000..9567410 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerHeaders.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.rescaling; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationTriggerMessageHeaders; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for triggering the rescaling of a job. + */ +public class RescalingTriggerHeaders extends + AsynchronousOperationTriggerMessageHeaders<EmptyRequestBody, RescalingTriggerMessageParameters> { + + private static final RescalingTriggerHeaders INSTANCE = new RescalingTriggerHeaders(); + + private static final String URL = String.format( + "/jobs/:%s/rescaling", + JobIDPathParameter.KEY); + + private RescalingTriggerHeaders() {} + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public RescalingTriggerMessageParameters getUnresolvedMessageParameters() { + return new RescalingTriggerMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.PATCH; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static RescalingTriggerHeaders getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerMessageParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerMessageParameters.java new file mode 100644 index 0000000..4b5d307 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingTriggerMessageParameters.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.rescaling; + +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; +import org.apache.flink.runtime.rest.messages.RescalingParallelismQueryParameter; + +import java.util.Collection; +import java.util.Collections; + +/** + * {@link MessageParameters} for triggering the rescaling of a job. + */ +public class RescalingTriggerMessageParameters extends JobMessageParameters { + + public final RescalingParallelismQueryParameter rescalingParallelismQueryParameter = new RescalingParallelismQueryParameter(); + + @Override + public Collection<MessageQueryParameter<?>> getQueryParameters() { + return Collections.singleton(rescalingParallelismQueryParameter); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java deleted file mode 100644 index 8058ab9..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RescaleParallelismQueryParameter.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.messages; - -/** - * Parallelism for the rescaling of jobs specified as a {@link MessageQueryParameter}. - */ -public class RescaleParallelismQueryParameter extends MessageQueryParameter<Integer> { - - public static final String KEY = "parallelism"; - - protected RescaleParallelismQueryParameter() { - super(KEY, MessageParameterRequisiteness.MANDATORY); - } - - @Override - public Integer convertValueFromString(String value) { - return Integer.valueOf(value); - } - - @Override - public String convertStringToValue(Integer value) { - return value.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4e840979/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 427332f..10a3650 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -61,6 +61,9 @@ import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers; +import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeaders; +import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders; import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers; import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; @@ -448,8 +451,21 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp SubtaskCurrentAttemptDetailsHeaders.getInstance(), executionGraphCache, executor, - metricFetcher - ); + metricFetcher); + + final RescalingHandlers rescalingHandlers = new RescalingHandlers(); + + final RescalingHandlers.RescalingTriggerHandler rescalingTriggerHandler = rescalingHandlers.new RescalingTriggerHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders); + + final RescalingHandlers.RescalingStatusHandler rescalingStatusHandler = rescalingHandlers.new RescalingStatusHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders); JobVertexBackPressureHandler jobVertexBackPressureHandler = new JobVertexBackPressureHandler( restAddressFuture, @@ -532,6 +548,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp handlers.add(Tuple2.of(JobVertexBackPressureHeaders.getInstance(), jobVertexBackPressureHandler)); handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobCancelTerminationHandler)); handlers.add(Tuple2.of(JobVertexDetailsHeaders.getInstance(), jobVertexDetailsHandler)); + handlers.add(Tuple2.of(RescalingTriggerHeaders.getInstance(), rescalingTriggerHandler)); + handlers.add(Tuple2.of(RescalingStatusHeaders.getInstance(), rescalingStatusHandler)); // TODO: Remove once the Yarn proxy can forward all REST verbs handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), jobCancelTerminationHandler));