Repository: flink
Updated Branches:
  refs/heads/master 55b76d54f -> dbabdb1cc


[FLINK-7534] Create LegacyRestHandlerAdapter for old REST handlers

Introduce LegacyRestHandler interface which the old REST handler have to 
implement
in order to make them usable for the RestServerEndpoint in combination with the
LegacyRestHandlerAdapter. The LegacyRestHandlerAdapter extends the 
AbstractRestHandler
and runs the LegacyRestHandler implementation.

As an example, this commit ports the ClusterOverviewHandler to the new 
interface. The
Dispatcher side still has to be properly implemented.

This closes #4603.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbabdb1c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbabdb1c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbabdb1c

Branch: refs/heads/master
Commit: dbabdb1cc2c122dbf1e83ffb9960491eaf4914bb
Parents: 55b76d5
Author: Till Rohrmann <[email protected]>
Authored: Fri Aug 18 16:18:19 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Wed Sep 20 17:50:27 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  15 +++
 .../runtime/dispatcher/DispatcherGateway.java   |   3 +
 .../dispatcher/DispatcherRestEndpoint.java      |  35 ++++-
 .../entrypoint/SessionClusterEntrypoint.java    |  12 +-
 .../messages/webmonitor/JobsOverview.java       |  25 +++-
 .../messages/webmonitor/StatusOverview.java     |  25 +++-
 .../webmonitor/StatusOverviewWithVersion.java   | 128 +++++++++++++++++++
 .../rest/handler/AbstractRestHandler.java       |   4 +-
 .../runtime/rest/handler/LegacyRestHandler.java |  38 ++++++
 .../rest/handler/LegacyRestHandlerAdapter.java  |  60 +++++++++
 .../runtime/rest/handler/RedirectHandler.java   |  79 +++++++-----
 .../handler/legacy/ClusterOverviewHandler.java  |  38 ++++--
 .../rest/messages/ClusterOverviewHeaders.java   |  72 +++++++++++
 .../rest/messages/EmptyMessageParameters.java   |  46 +++++++
 .../runtime/rest/messages/EmptyRequestBody.java |  25 ++++
 .../StatusOverviewWithVersionTest.java          |  60 +++++++++
 16 files changed, 602 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 521fd8b..6b9999c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
@@ -243,6 +244,20 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                return restAddressFuture;
        }
 
+       @Override
+       public CompletableFuture<StatusOverview> requestStatusOverview(Time 
timeout) {
+               // TODO: Implement proper cluster overview generation
+               return CompletableFuture.completedFuture(
+                       new StatusOverview(
+                               42,
+                               1337,
+                               1337,
+                               5,
+                               6,
+                               7,
+                               8));
+       }
+
        /**
         * Cleans up the job related data from the dispatcher. If cleanupHA is 
true, then
         * the data will also be removed from HA.

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 398befb..ee5484e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -53,4 +54,6 @@ public interface DispatcherGateway extends 
FencedRpcGateway<DispatcherId>, Restf
         */
        CompletableFuture<Collection<JobID>> listJobs(
                @RpcTimeout Time timeout);
+
+       CompletableFuture<StatusOverview> requestStatusOverview(@RpcTimeout 
Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index debd674..1f64c67 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -20,11 +20,16 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.FileUtils;
@@ -34,10 +39,11 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * REST endpoint for the {@link Dispatcher} component.
@@ -47,20 +53,36 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
        private final GatewayRetriever<DispatcherGateway> leaderRetriever;
        private final Time timeout;
        private final File tmpDir;
+       private final Executor executor;
 
        public DispatcherRestEndpoint(
                        RestServerEndpointConfiguration configuration,
                        GatewayRetriever<DispatcherGateway> leaderRetriever,
                        Time timeout,
-                       File tmpDir) {
+                       File tmpDir,
+                       Executor executor) {
                super(configuration);
                this.leaderRetriever = 
Preconditions.checkNotNull(leaderRetriever);
                this.timeout = Preconditions.checkNotNull(timeout);
                this.tmpDir = Preconditions.checkNotNull(tmpDir);
+               this.executor = Preconditions.checkNotNull(executor);
        }
 
        @Override
        protected Collection<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> 
restAddressFuture) {
+               ArrayList<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers = new ArrayList<>(2);
+
+               LegacyRestHandlerAdapter<DispatcherGateway, 
StatusOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = new 
LegacyRestHandlerAdapter<>(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       ClusterOverviewHeaders.getInstance(),
+                       new ClusterOverviewHandler(
+                               executor,
+                               timeout));
+
+               handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), 
clusterOverviewHandler));
+
                Optional<StaticFileServerHandler<DispatcherGateway>> 
optWebContent;
 
                try {
@@ -74,11 +96,10 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        optWebContent = Optional.empty();
                }
 
-               return optWebContent
-                       .map(webContent ->
-                               Collections.singleton(
-                                       Tuple2.<RestHandlerSpecification, 
ChannelInboundHandler>of(WebContentHandlerSpecification.getInstance(), 
webContent)))
-                       .orElseGet(() -> Collections.emptySet());
+               optWebContent.ifPresent(
+                       webContent -> 
handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), 
webContent)));
+
+               return handlers;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 80bd384..9f4e04a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -43,6 +43,7 @@ import org.apache.flink.util.FlinkException;
 
 import java.io.File;
 import java.util.Optional;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for session cluster entry points.
@@ -81,7 +82,8 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
 
                dispatcherRestEndpoint = createDispatcherRestEndpoint(
                        configuration,
-                       dispatcherGatewayRetriever);
+                       dispatcherGatewayRetriever,
+                       rpcService.getExecutor());
 
                LOG.debug("Starting Dispatcher REST endpoint.");
                dispatcherRestEndpoint.start();
@@ -151,8 +153,9 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
        }
 
        protected DispatcherRestEndpoint createDispatcherRestEndpoint(
-               Configuration configuration,
-               LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever) throws Exception {
+                       Configuration configuration,
+                       LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
+                       Executor executor) throws Exception {
 
                Time timeout = 
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
                File tmpDir = new 
File(configuration.getString(WebOptions.TMP_DIR));
@@ -161,7 +164,8 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        
RestServerEndpointConfiguration.fromConfiguration(configuration),
                        dispatcherGatewayRetriever,
                        timeout,
-                       tmpDir);
+                       tmpDir,
+                       executor);
        }
 
        protected Dispatcher createDispatcher(

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
index 084e97d..9526bc7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
@@ -18,20 +18,39 @@
 
 package org.apache.flink.runtime.messages.webmonitor;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 /**
  * An overview of how many jobs are in which status.
  */
 public class JobsOverview implements InfoMessage {
 
        private static final long serialVersionUID = -3699051943490133183L;
-       
+
+       public static final String FIELD_NAME_JOBS_RUNNING = "jobs-running";
+       public static final String FIELD_NAME_JOBS_FINISHED = "jobs-finished";
+       public static final String FIELD_NAME_JOBS_CANCELLED = "jobs-cancelled";
+       public static final String FIELD_NAME_JOBS_FAILED = "jobs-failed";
+
+       @JsonProperty(FIELD_NAME_JOBS_RUNNING)
        private final int numJobsRunningOrPending;
+
+       @JsonProperty(FIELD_NAME_JOBS_FINISHED)
        private final int numJobsFinished;
+
+       @JsonProperty(FIELD_NAME_JOBS_CANCELLED)
        private final int numJobsCancelled;
+
+       @JsonProperty(FIELD_NAME_JOBS_FAILED)
        private final int numJobsFailed;
 
-       public JobsOverview(int numJobsRunningOrPending, int numJobsFinished,
-                                               int numJobsCancelled, int 
numJobsFailed) {
+       @JsonCreator
+       public JobsOverview(
+                       @JsonProperty(FIELD_NAME_JOBS_RUNNING) int 
numJobsRunningOrPending,
+                       @JsonProperty(FIELD_NAME_JOBS_FINISHED) int 
numJobsFinished,
+                       @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int 
numJobsCancelled,
+                       @JsonProperty(FIELD_NAME_JOBS_FAILED) int 
numJobsFailed) {
                
                this.numJobsRunningOrPending = numJobsRunningOrPending;
                this.numJobsFinished = numJobsFinished;

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
index 214141e..2c04b7e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.messages.webmonitor;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 /**
  * Response to the {@link RequestStatusOverview} message, carrying a 
description
  * of the Flink cluster status.
@@ -25,13 +28,29 @@ package org.apache.flink.runtime.messages.webmonitor;
 public class StatusOverview extends JobsOverview {
 
        private static final long serialVersionUID = -729861859715105265L;
-       
+
+       public static final String FIELD_NAME_TASKMANAGERS = "taskmanagers";
+       public static final String FIELD_NAME_SLOTS_TOTAL = "slots-total";
+       public static final String FIELD_NAME_SLOTS_AVAILABLE = 
"slots-available";
+
+       @JsonProperty(FIELD_NAME_TASKMANAGERS)
        private final int numTaskManagersConnected;
+
+       @JsonProperty(FIELD_NAME_SLOTS_TOTAL)
        private final int numSlotsTotal;
+
+       @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE)
        private final int numSlotsAvailable;
 
-       public StatusOverview(int numTaskManagersConnected, int numSlotsTotal, 
int numSlotsAvailable,
-                                                       int 
numJobsRunningOrPending, int numJobsFinished, int numJobsCancelled, int 
numJobsFailed) {
+       @JsonCreator
+       public StatusOverview(
+                       @JsonProperty(FIELD_NAME_TASKMANAGERS) int 
numTaskManagersConnected,
+                       @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
+                       @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int 
numSlotsAvailable,
+                       @JsonProperty(FIELD_NAME_JOBS_RUNNING) int 
numJobsRunningOrPending,
+                       @JsonProperty(FIELD_NAME_JOBS_FINISHED) int 
numJobsFinished,
+                       @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int 
numJobsCancelled,
+                       @JsonProperty(FIELD_NAME_JOBS_FAILED) int 
numJobsFailed) {
 
                super(numJobsRunningOrPending, numJobsFinished, 
numJobsCancelled, numJobsFailed);
                

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java
new file mode 100644
index 0000000..9029537
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Status overview message including the current Flink version and commit id.
+ */
+public class StatusOverviewWithVersion extends StatusOverview implements 
ResponseBody {
+
+       private static final long serialVersionUID = 5000058311783413216L;
+
+       public static final String FIELD_NAME_VERSION = "flink-version";
+       public static final String FIELD_NAME_COMMIT = "flink-commit";
+
+       @JsonProperty(FIELD_NAME_VERSION)
+       private final String version;
+
+       @JsonProperty(FIELD_NAME_COMMIT)
+       private final String commitId;
+
+       @JsonCreator
+       public StatusOverviewWithVersion(
+                       @JsonProperty(FIELD_NAME_TASKMANAGERS) int 
numTaskManagersConnected,
+                       @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
+                       @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int 
numSlotsAvailable,
+                       @JsonProperty(FIELD_NAME_JOBS_RUNNING) int 
numJobsRunningOrPending,
+                       @JsonProperty(FIELD_NAME_JOBS_FINISHED) int 
numJobsFinished,
+                       @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int 
numJobsCancelled,
+                       @JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed,
+                       @JsonProperty(FIELD_NAME_VERSION) String version,
+                       @JsonProperty(FIELD_NAME_COMMIT) String commitId) {
+               super(
+                       numTaskManagersConnected,
+                       numSlotsTotal,
+                       numSlotsAvailable,
+                       numJobsRunningOrPending,
+                       numJobsFinished,
+                       numJobsCancelled,
+                       numJobsFailed);
+
+               this.version = Preconditions.checkNotNull(version);
+               this.commitId = Preconditions.checkNotNull(commitId);
+       }
+
+       public StatusOverviewWithVersion(
+                       int numTaskManagersConnected,
+                       int numSlotsTotal,
+                       int numSlotsAvailable,
+                       JobsOverview jobs1,
+                       JobsOverview jobs2,
+                       String version,
+                       String commitId) {
+               super(numTaskManagersConnected, numSlotsTotal, 
numSlotsAvailable, jobs1, jobs2);
+
+               this.version = Preconditions.checkNotNull(version);
+               this.commitId = Preconditions.checkNotNull(commitId);
+       }
+
+       public static StatusOverviewWithVersion 
fromStatusOverview(StatusOverview statusOverview, String version, String 
commitId) {
+               return new StatusOverviewWithVersion(
+                       statusOverview.getNumTaskManagersConnected(),
+                       statusOverview.getNumSlotsTotal(),
+                       statusOverview.getNumSlotsAvailable(),
+                       statusOverview.getNumJobsRunningOrPending(),
+                       statusOverview.getNumJobsFinished(),
+                       statusOverview.getNumJobsCancelled(),
+                       statusOverview.getNumJobsFailed(),
+                       version,
+                       commitId);
+       }
+
+       public String getVersion() {
+               return version;
+       }
+
+       public String getCommitId() {
+               return commitId;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               if (!super.equals(o)) {
+                       return false;
+               }
+
+               StatusOverviewWithVersion that = (StatusOverviewWithVersion) o;
+
+               return Objects.equals(version, that.getVersion()) && 
Objects.equals(commitId, that.getCommitId());
+       }
+
+       @Override
+       public int hashCode() {
+               int result = super.hashCode();
+               result = 31 * result + (version != null ? version.hashCode() : 
0);
+               result = 31 * result + (commitId != null ? commitId.hashCode() 
: 0);
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 596c947..697c046 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -66,11 +66,11 @@ public abstract class AbstractRestHandler<T extends 
RestfulGateway, R extends Re
        private final MessageHeaders<R, P, M> messageHeaders;
 
        protected AbstractRestHandler(
-                       CompletableFuture<String> localAddressFuture,
+                       CompletableFuture<String> localRestAddress,
                        GatewayRetriever<T> leaderRetriever,
                        Time timeout,
                        MessageHeaders<R, P, M> messageHeaders) {
-               super(localAddressFuture, leaderRetriever, timeout);
+               super(localRestAddress, leaderRetriever, timeout);
                this.messageHeaders = messageHeaders;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandler.java
new file mode 100644
index 0000000..3e70575
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface which Flink's legacy REST handler have to implement in order to 
be usable
+ * via the {@link LegacyRestHandlerAdapter}.
+ *
+ * @param <T> type of the gateway
+ * @param <R> type of the REST response
+ */
+public interface LegacyRestHandler<T extends RestfulGateway, R extends 
ResponseBody, M extends MessageParameters> {
+
+       CompletableFuture<R> handleRequest(HandlerRequest<EmptyRequestBody, M> 
request, T gateway);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java
new file mode 100644
index 0000000..e9eaff7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Adapter for Flink's legacy REST handlers.
+ *
+ * @param <T> type of the gateway
+ * @param <R> type of the REST response
+ * @param <M> type of the MessageParameters
+ */
+public class LegacyRestHandlerAdapter<T extends RestfulGateway, R extends 
ResponseBody, M extends MessageParameters> extends AbstractRestHandler<T, 
EmptyRequestBody, R, M> {
+
+       private final LegacyRestHandler<T, R, M> legacyRestHandler;
+
+       public LegacyRestHandlerAdapter(
+                       CompletableFuture<String> localRestAddress,
+                       GatewayRetriever<T> leaderRetriever,
+                       Time timeout,
+                       MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
+                       LegacyRestHandler<T, R, M> legacyRestHandler) {
+               super(localRestAddress, leaderRetriever, timeout, 
messageHeaders);
+
+               this.legacyRestHandler = 
Preconditions.checkNotNull(legacyRestHandler);
+       }
+
+       @Override
+       protected CompletableFuture<R> handleRequest(@Nonnull 
HandlerRequest<EmptyRequestBody, M> request, @Nonnull T gateway) throws 
RestHandlerException {
+               return legacyRestHandler.handleRequest(request, gateway);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
index 83550cd..dfede98 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,22 +79,24 @@ public abstract class RedirectHandler<T extends 
RestfulGateway> extends SimpleCh
                ChannelHandlerContext channelHandlerContext,
                Routed routed) throws Exception {
 
-               try {
-                       if (localAddressFuture.isDone()) {
-                               if (localAddress == null) {
-                                       try {
-                                               localAddress = 
localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-                                       } catch (Exception e) {
-                                               logger.error("Could not obtain 
local address.", e);
+               if (localAddressFuture.isDone()) {
+                       if (localAddress == null) {
+                               try {
+                                       localAddress = 
localAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+                               } catch (Exception e) {
+                                       logger.error("Could not obtain local 
address.", e);
 
-                                               HandlerUtils.sendErrorResponse(
-                                                       channelHandlerContext,
-                                                       routed.request(),
-                                                       new 
ErrorResponseBody("Fatal error. Could not obtain local address. Please try to 
refresh."),
-                                                       
HttpResponseStatus.INTERNAL_SERVER_ERROR);
-                                       }
+                                       HandlerUtils.sendErrorResponse(
+                                               channelHandlerContext,
+                                               routed.request(),
+                                               new ErrorResponseBody("Fatal 
error. Could not obtain local address. Please try to refresh."),
+                                               
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+
+                                       return;
                                }
+                       }
 
+                       try {
                                OptionalConsumer<T> optLeaderConsumer = 
OptionalConsumer.of(leaderRetriever.getNow());
 
                                optLeaderConsumer.ifPresent(
@@ -103,34 +106,42 @@ public abstract class RedirectHandler<T extends 
RestfulGateway> extends SimpleCh
                                                        gateway,
                                                        timeout);
 
+                                               // retain the message for the 
asynchronous handler
+                                               
ReferenceCountUtil.retain(routed);
+
                                                
optRedirectAddressFuture.whenComplete(
                                                        (Optional<String> 
optRedirectAddress, Throwable throwable) -> {
                                                                HttpResponse 
response;
-                                                               if (throwable 
!= null) {
-                                                                       
logger.error("Could not retrieve the redirect address.", throwable);
+                                                               try {
+                                                                       if 
(throwable != null) {
+                                                                               
logger.error("Could not retrieve the redirect address.", throwable);
 
                                                                        
HandlerUtils.sendErrorResponse(
                                                                                
channelHandlerContext,
                                                                                
routed.request(),
                                                                                
new ErrorResponseBody("Could not retrieve the redirect address of the current 
leader. Please try to refresh."),
                                                                                
HttpResponseStatus.INTERNAL_SERVER_ERROR);
-                                                               } else if 
(optRedirectAddress.isPresent()) {
-                                                                       
response = HandlerRedirectUtils.getRedirectResponse(
-                                                                               
optRedirectAddress.get(),
-                                                                               
routed.path());
-
-                                                                       
KeepAliveWrite.flush(channelHandlerContext, routed.request(), response);
-                                                               } else {
-                                                                       try {
-                                                                               
respondAsLeader(channelHandlerContext, routed, gateway);
-                                                                       } catch 
(Exception e) {
-                                                                               
logger.error("Error while responding as leader.", e);
+                                                                       } else 
if (optRedirectAddress.isPresent()) {
+                                                                               
response = HandlerRedirectUtils.getRedirectResponse(
+                                                                               
        optRedirectAddress.get(),
+                                                                               
        routed.path());
+
+                                                                               
KeepAliveWrite.flush(channelHandlerContext, routed.request(), response);
+                                                                       } else {
+                                                                               
try {
+                                                                               
        respondAsLeader(channelHandlerContext, routed, gateway);
+                                                                               
} catch (Exception e) {
+                                                                               
        logger.error("Error while responding as leader.", e);
                                                                                
HandlerUtils.sendErrorResponse(
-                                                                               
        channelHandlerContext,
-                                                                               
        routed.request(),
+                                                                               
                channelHandlerContext,
+                                                                               
                routed.request(),
                                                                                
        new ErrorResponseBody("Error while responding to the request."),
                                                                                
        HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                                                                               
}
                                                                        }
+                                                               } finally {
+                                                                       // 
release the message after processing it asynchronously
+                                                                       
ReferenceCountUtil.release(routed);
                                                                }
                                                        }
                                                );
@@ -142,19 +153,21 @@ public abstract class RedirectHandler<T extends 
RestfulGateway> extends SimpleCh
                                                        routed.request(),
                                                        new 
ErrorResponseBody("Service temporarily unavailable due to an ongoing leader 
election. Please refresh."),
                                                        
HttpResponseStatus.SERVICE_UNAVAILABLE));
-                       } else {
+
+                       } catch (Throwable throwable) {
+                               logger.warn("Error occurred while processing 
web request.", throwable);
+
                                HandlerUtils.sendErrorResponse(
                                        channelHandlerContext,
                                        routed.request(),
-                                       new ErrorResponseBody("Local address 
has not been resolved. This indicates an internal error."),
+                                       new ErrorResponseBody("Error occurred 
in RedirectHandler: " + throwable.getMessage() + '.'),
                                        
HttpResponseStatus.INTERNAL_SERVER_ERROR);
                        }
-               } catch (Throwable throwable) {
-                       logger.warn("Error occurred while processing web 
request.", throwable);
+               } else {
                        HandlerUtils.sendErrorResponse(
                                channelHandlerContext,
                                routed.request(),
-                               new ErrorResponseBody("Error occurred in 
RedirectHandler: " + throwable.getMessage() + '.'),
+                               new ErrorResponseBody("Local address has not 
been resolved. This indicates an internal error."),
                                HttpResponseStatus.INTERNAL_SERVER_ERROR);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
index db13633..9340fa2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
@@ -21,8 +21,15 @@ package org.apache.flink.runtime.rest.handler.legacy;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.FlinkException;
 
@@ -34,15 +41,16 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import static 
org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders.CLUSTER_OVERVIEW_REST_PATH;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Responder that returns the status of the Flink cluster, such as how many
  * TaskManagers are currently connected, and how many jobs are running.
  */
-public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
+public class ClusterOverviewHandler extends AbstractJsonRequestHandler 
implements LegacyRestHandler<DispatcherGateway, StatusOverviewWithVersion, 
EmptyMessageParameters> {
+
 
-       private static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
 
        private static final String version = 
EnvironmentInformation.getVersion();
 
@@ -74,16 +82,16 @@ public class ClusterOverviewHandler extends 
AbstractJsonRequestHandler {
                                                        JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                                                        gen.writeStartObject();
-                                                       
gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
-                                                       
gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
-                                                       
gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
-                                                       
gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
-                                                       
gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
-                                                       
gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
-                                                       
gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
-                                                       
gen.writeStringField("flink-version", version);
+                                                       
gen.writeNumberField(StatusOverview.FIELD_NAME_TASKMANAGERS, 
overview.getNumTaskManagersConnected());
+                                                       
gen.writeNumberField(StatusOverview.FIELD_NAME_SLOTS_TOTAL, 
overview.getNumSlotsTotal());
+                                                       
gen.writeNumberField(StatusOverview.FIELD_NAME_SLOTS_AVAILABLE, 
overview.getNumSlotsAvailable());
+                                                       
gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_RUNNING, 
overview.getNumJobsRunningOrPending());
+                                                       
gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_FINISHED, 
overview.getNumJobsFinished());
+                                                       
gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_CANCELLED, 
overview.getNumJobsCancelled());
+                                                       
gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_FAILED, 
overview.getNumJobsFailed());
+                                                       
gen.writeStringField(StatusOverviewWithVersion.FIELD_NAME_VERSION, version);
                                                        if 
(!commitID.equals(EnvironmentInformation.UNKNOWN)) {
-                                                               
gen.writeStringField("flink-commit", commitID);
+                                                               
gen.writeStringField(StatusOverviewWithVersion.FIELD_NAME_COMMIT, commitID);
                                                        }
                                                        gen.writeEndObject();
 
@@ -102,4 +110,12 @@ public class ClusterOverviewHandler extends 
AbstractJsonRequestHandler {
                        return FutureUtils.completedExceptionally(new 
FlinkException("Failed to fetch list of all running jobs: ", e));
                }
        }
+
+       @Override
+       public CompletableFuture<StatusOverviewWithVersion> 
handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, 
DispatcherGateway gateway) {
+               CompletableFuture<StatusOverview> overviewFuture = 
gateway.requestStatusOverview(timeout);
+
+               return overviewFuture.thenApply(
+                       statusOverview -> 
StatusOverviewWithVersion.fromStatusOverview(statusOverview, version, 
commitID));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
new file mode 100644
index 0000000..f0f98ec
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link ClusterOverviewHandler}.
+ */
+public final class ClusterOverviewHeaders implements 
MessageHeaders<EmptyRequestBody, StatusOverviewWithVersion, 
EmptyMessageParameters> {
+
+       private static final ClusterOverviewHeaders INSTANCE = new 
ClusterOverviewHeaders();
+
+       public static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
+
+       // make this class a singleton
+       private ClusterOverviewHeaders() {}
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.GET;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return CLUSTER_OVERVIEW_REST_PATH;
+       }
+
+       @Override
+       public Class<StatusOverviewWithVersion> getResponseClass() {
+               return StatusOverviewWithVersion.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public EmptyMessageParameters getUnresolvedMessageParameters() {
+               return EmptyMessageParameters.getInstance();
+       }
+
+       public static ClusterOverviewHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyMessageParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyMessageParameters.java
new file mode 100644
index 0000000..82889bd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyMessageParameters.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * MessageParameters implementation which has no parameters.
+ */
+public class EmptyMessageParameters extends MessageParameters {
+
+       private static final EmptyMessageParameters INSTANCE = new 
EmptyMessageParameters();
+
+       private EmptyMessageParameters() {}
+
+       @Override
+       public Collection<MessagePathParameter<?>> getPathParameters() {
+               return Collections.emptyList();
+       }
+
+       @Override
+       public Collection<MessageQueryParameter<?>> getQueryParameters() {
+               return Collections.emptyList();
+       }
+
+       public static EmptyMessageParameters getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
new file mode 100644
index 0000000..603c3d4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Request which do not have a request payload.
+ */
+public class EmptyRequestBody implements RequestBody {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbabdb1c/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java
new file mode 100644
index 0000000..d69049e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link StatusOverviewWithVersion}.
+ */
+public class StatusOverviewWithVersionTest extends TestLogger {
+
+       /**
+        * Tests that we can marshal and unmarshal StatusOverviewWithVersion.
+        */
+       @Test
+       public void testJsonMarshalling() throws JsonProcessingException {
+               final StatusOverviewWithVersion expected = new 
StatusOverviewWithVersion(
+                       1,
+                       3,
+                       3,
+                       7,
+                       4,
+                       2,
+                       0,
+                       "version",
+                       "commit");
+
+               ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
+
+               JsonNode json = objectMapper.valueToTree(expected);
+
+               final StatusOverviewWithVersion unmarshalled = 
objectMapper.treeToValue(json, StatusOverviewWithVersion.class);
+
+               assertEquals(expected, unmarshalled);
+       }
+}

Reply via email to