Repository: flink
Updated Branches:
  refs/heads/master dbabdb1cc -> c6243b8b1


[FLINK-7535] Port DashboardConfigHandler to new REST endpoint

Lets DashboardConfigHandler implement the LegacyRestHandler. Moreover, this
commit defines the appropriate DashboardConfigurationHeaders.

The DispatcherRestEndpoint registers the DashboardConfigHandler.

This closes #4604.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6243b8b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6243b8b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6243b8b

Branch: refs/heads/master
Commit: c6243b8b1de6117623d3c4255f47f062d10c4602
Parents: dbabdb1
Author: Till Rohrmann <[email protected]>
Authored: Mon Aug 21 15:11:08 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Thu Sep 21 09:30:50 2017 +0200

----------------------------------------------------------------------
 .../webmonitor/history/HistoryServer.java       |   4 +-
 .../dispatcher/DispatcherRestEndpoint.java      |  35 +++--
 .../entrypoint/SessionClusterEntrypoint.java    |   9 +-
 .../webmonitor/StatusOverviewWithVersion.java   | 128 -----------------
 .../rest/handler/RestHandlerConfiguration.java  |  68 +++++++++
 .../handler/legacy/ClusterOverviewHandler.java  |   2 +-
 .../handler/legacy/DashboardConfigHandler.java  |  45 +++---
 .../legacy/messages/DashboardConfiguration.java | 137 +++++++++++++++++++
 .../messages/StatusOverviewWithVersion.java     | 130 ++++++++++++++++++
 .../rest/messages/ClusterOverviewHeaders.java   |   2 +-
 .../messages/DashboardConfigurationHeaders.java |  70 ++++++++++
 .../ZooKeeperLeaderElectionTest.java            |   1 -
 .../StatusOverviewWithVersionTest.java          |  60 --------
 .../legacy/DashboardConfigHandlerTest.java      |  22 +--
 .../messages/DashboardConfigurationTest.java    |  56 ++++++++
 .../messages/StatusOverviewWithVersionTest.java |  60 ++++++++
 16 files changed, 592 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 01228d5..f9aea22 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
@@ -47,6 +48,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.nio.file.Files;
+import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -289,7 +291,7 @@ public class HistoryServer {
 
        private void createDashboardConfigFile() throws IOException {
                try (FileWriter fw = createOrGetFile(webDir, "config")) {
-                       
fw.write(DashboardConfigHandler.createConfigJson(webRefreshIntervalMillis));
+                       
fw.write(DashboardConfigHandler.createConfigJson(DashboardConfiguration.from(webRefreshIntervalMillis,
 ZonedDateTime.now())));
                        fw.flush();
                } catch (IOException ioe) {
                        LOG.error("Failed to write config file.");

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 1f64c67..6054a7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -20,15 +20,19 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
+import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -51,26 +55,25 @@ import java.util.concurrent.Executor;
 public class DispatcherRestEndpoint extends RestServerEndpoint {
 
        private final GatewayRetriever<DispatcherGateway> leaderRetriever;
-       private final Time timeout;
-       private final File tmpDir;
+       private final RestHandlerConfiguration restConfiguration;
        private final Executor executor;
 
        public DispatcherRestEndpoint(
                        RestServerEndpointConfiguration configuration,
                        GatewayRetriever<DispatcherGateway> leaderRetriever,
-                       Time timeout,
-                       File tmpDir,
+                       RestHandlerConfiguration restConfiguration,
                        Executor executor) {
                super(configuration);
                this.leaderRetriever = 
Preconditions.checkNotNull(leaderRetriever);
-               this.timeout = Preconditions.checkNotNull(timeout);
-               this.tmpDir = Preconditions.checkNotNull(tmpDir);
+               this.restConfiguration = 
Preconditions.checkNotNull(restConfiguration);
                this.executor = Preconditions.checkNotNull(executor);
        }
 
        @Override
        protected Collection<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> 
restAddressFuture) {
-               ArrayList<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers = new ArrayList<>(2);
+               ArrayList<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers = new ArrayList<>(3);
+
+               final Time timeout = restConfiguration.getTimeout();
 
                LegacyRestHandlerAdapter<DispatcherGateway, 
StatusOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = new 
LegacyRestHandlerAdapter<>(
                        restAddressFuture,
@@ -81,7 +84,16 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                                executor,
                                timeout));
 
-               handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), 
clusterOverviewHandler));
+               LegacyRestHandlerAdapter<DispatcherGateway, 
DashboardConfiguration, EmptyMessageParameters> dashboardConfigurationHandler = 
new LegacyRestHandlerAdapter<>(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       DashboardConfigurationHeaders.getInstance(),
+                       new DashboardConfigHandler(
+                               executor,
+                               restConfiguration.getRefreshInterval()));
+
+               final File tmpDir = restConfiguration.getTmpDir();
 
                Optional<StaticFileServerHandler<DispatcherGateway>> 
optWebContent;
 
@@ -96,6 +108,9 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        optWebContent = Optional.empty();
                }
 
+               handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), 
clusterOverviewHandler));
+               
handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), 
dashboardConfigurationHandler));
+
                optWebContent.ifPresent(
                        webContent -> 
handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), 
webContent)));
 
@@ -106,6 +121,8 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
        public void shutdown(Time timeout) {
                super.shutdown(timeout);
 
+               final File tmpDir = restConfiguration.getTmpDir();
+
                try {
                        log.info("Removing cache directory {}", tmpDir);
                        FileUtils.deleteDirectory(tmpDir);

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 9f4e04a..e394854 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
@@ -34,6 +33,7 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -41,7 +41,6 @@ import 
org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
-import java.io.File;
 import java.util.Optional;
 import java.util.concurrent.Executor;
 
@@ -157,14 +156,10 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever,
                        Executor executor) throws Exception {
 
-               Time timeout = 
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
-               File tmpDir = new 
File(configuration.getString(WebOptions.TMP_DIR));
-
                return new DispatcherRestEndpoint(
                        
RestServerEndpointConfiguration.fromConfiguration(configuration),
                        dispatcherGatewayRetriever,
-                       timeout,
-                       tmpDir,
+                       
RestHandlerConfiguration.fromConfiguration(configuration),
                        executor);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java
deleted file mode 100644
index 9029537..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.util.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.Objects;
-
-/**
- * Status overview message including the current Flink version and commit id.
- */
-public class StatusOverviewWithVersion extends StatusOverview implements 
ResponseBody {
-
-       private static final long serialVersionUID = 5000058311783413216L;
-
-       public static final String FIELD_NAME_VERSION = "flink-version";
-       public static final String FIELD_NAME_COMMIT = "flink-commit";
-
-       @JsonProperty(FIELD_NAME_VERSION)
-       private final String version;
-
-       @JsonProperty(FIELD_NAME_COMMIT)
-       private final String commitId;
-
-       @JsonCreator
-       public StatusOverviewWithVersion(
-                       @JsonProperty(FIELD_NAME_TASKMANAGERS) int 
numTaskManagersConnected,
-                       @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
-                       @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int 
numSlotsAvailable,
-                       @JsonProperty(FIELD_NAME_JOBS_RUNNING) int 
numJobsRunningOrPending,
-                       @JsonProperty(FIELD_NAME_JOBS_FINISHED) int 
numJobsFinished,
-                       @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int 
numJobsCancelled,
-                       @JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed,
-                       @JsonProperty(FIELD_NAME_VERSION) String version,
-                       @JsonProperty(FIELD_NAME_COMMIT) String commitId) {
-               super(
-                       numTaskManagersConnected,
-                       numSlotsTotal,
-                       numSlotsAvailable,
-                       numJobsRunningOrPending,
-                       numJobsFinished,
-                       numJobsCancelled,
-                       numJobsFailed);
-
-               this.version = Preconditions.checkNotNull(version);
-               this.commitId = Preconditions.checkNotNull(commitId);
-       }
-
-       public StatusOverviewWithVersion(
-                       int numTaskManagersConnected,
-                       int numSlotsTotal,
-                       int numSlotsAvailable,
-                       JobsOverview jobs1,
-                       JobsOverview jobs2,
-                       String version,
-                       String commitId) {
-               super(numTaskManagersConnected, numSlotsTotal, 
numSlotsAvailable, jobs1, jobs2);
-
-               this.version = Preconditions.checkNotNull(version);
-               this.commitId = Preconditions.checkNotNull(commitId);
-       }
-
-       public static StatusOverviewWithVersion 
fromStatusOverview(StatusOverview statusOverview, String version, String 
commitId) {
-               return new StatusOverviewWithVersion(
-                       statusOverview.getNumTaskManagersConnected(),
-                       statusOverview.getNumSlotsTotal(),
-                       statusOverview.getNumSlotsAvailable(),
-                       statusOverview.getNumJobsRunningOrPending(),
-                       statusOverview.getNumJobsFinished(),
-                       statusOverview.getNumJobsCancelled(),
-                       statusOverview.getNumJobsFailed(),
-                       version,
-                       commitId);
-       }
-
-       public String getVersion() {
-               return version;
-       }
-
-       public String getCommitId() {
-               return commitId;
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-               if (!super.equals(o)) {
-                       return false;
-               }
-
-               StatusOverviewWithVersion that = (StatusOverviewWithVersion) o;
-
-               return Objects.equals(version, that.getVersion()) && 
Objects.equals(commitId, that.getCommitId());
-       }
-
-       @Override
-       public int hashCode() {
-               int result = super.hashCode();
-               result = 31 * result + (version != null ? version.hashCode() : 
0);
-               result = 31 * result + (commitId != null ? commitId.hashCode() 
: 0);
-               return result;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
new file mode 100644
index 0000000..9220bd9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.util.Preconditions;
+
+import java.io.File;
+
+/**
+ * Configuration object containing values for the rest handler configuration.
+ */
+public class RestHandlerConfiguration {
+
+       private final long refreshInterval;
+
+       private final Time timeout;
+
+       private final File tmpDir;
+
+       public RestHandlerConfiguration(long refreshInterval, Time timeout, 
File tmpDir) {
+               Preconditions.checkArgument(refreshInterval > 0L, "The refresh 
interval (ms) should be larger than 0.");
+               this.refreshInterval = refreshInterval;
+
+               this.timeout = Preconditions.checkNotNull(timeout);
+               this.tmpDir = Preconditions.checkNotNull(tmpDir);
+       }
+
+       public long getRefreshInterval() {
+               return refreshInterval;
+       }
+
+       public Time getTimeout() {
+               return timeout;
+       }
+
+       public File getTmpDir() {
+               return tmpDir;
+       }
+
+       public static RestHandlerConfiguration fromConfiguration(Configuration 
configuration) {
+               final long refreshInterval = 
configuration.getLong(WebOptions.REFRESH_INTERVAL);
+
+               final Time timeout = 
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
+               final File tmpDir = new 
File(configuration.getString(WebOptions.TMP_DIR));
+
+               return new RestHandlerConfiguration(refreshInterval, timeout, 
tmpDir);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
index 9340fa2..480c9e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
@@ -25,9 +25,9 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.util.EnvironmentInformation;

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
index e8854f4..0cef5fb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
@@ -18,15 +18,20 @@
 
 package org.apache.flink.runtime.rest.handler.legacy;
 
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.time.ZonedDateTime;
 import java.util.Map;
-import java.util.TimeZone;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -35,16 +40,21 @@ import java.util.concurrent.Executor;
  * against this web server should behave. It defines for example the refresh 
interval,
  * and time zone of the server timestamps.
  */
-public class DashboardConfigHandler extends AbstractJsonRequestHandler {
+public class DashboardConfigHandler extends AbstractJsonRequestHandler 
implements LegacyRestHandler<DispatcherGateway, DashboardConfiguration, 
EmptyMessageParameters> {
 
-       private static final String DASHBOARD_CONFIG_REST_PATH = "/config";
+       public static final String DASHBOARD_CONFIG_REST_PATH = "/config";
 
        private final String configString;
 
+       private final DashboardConfiguration dashboardConfiguration;
+
        public DashboardConfigHandler(Executor executor, long refreshInterval) {
                super(executor);
+
+               dashboardConfiguration = 
DashboardConfiguration.from(refreshInterval, ZonedDateTime.now());
+
                try {
-                       this.configString = createConfigJson(refreshInterval);
+                       this.configString = 
createConfigJson(dashboardConfiguration);
                }
                catch (Exception e) {
                        // should never happen
@@ -58,28 +68,25 @@ public class DashboardConfigHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
+       public CompletableFuture<DashboardConfiguration> 
handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, 
DispatcherGateway gateway) {
+               return 
CompletableFuture.completedFuture(dashboardConfiguration);
+       }
+
+       @Override
        public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
                return CompletableFuture.completedFuture(configString);
        }
 
-       public static String createConfigJson(long refreshInterval) throws 
IOException {
+       public static String createConfigJson(DashboardConfiguration 
dashboardConfiguration) throws IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
-               TimeZone timeZone = TimeZone.getDefault();
-               String timeZoneName = timeZone.getDisplayName();
-               long timeZoneOffset = timeZone.getRawOffset();
-
                gen.writeStartObject();
-               gen.writeNumberField("refresh-interval", refreshInterval);
-               gen.writeNumberField("timezone-offset", timeZoneOffset);
-               gen.writeStringField("timezone-name", timeZoneName);
-               gen.writeStringField("flink-version", 
EnvironmentInformation.getVersion());
-
-               EnvironmentInformation.RevisionInformation revision = 
EnvironmentInformation.getRevisionInformation();
-               if (revision != null) {
-                       gen.writeStringField("flink-revision", 
revision.commitId + " @ " + revision.commitDate);
-               }
+               
gen.writeNumberField(DashboardConfiguration.FIELD_NAME_REFRESH_INTERVAL, 
dashboardConfiguration.getRefreshInterval());
+               
gen.writeNumberField(DashboardConfiguration.FIELD_NAME_TIMEZONE_OFFSET, 
dashboardConfiguration.getTimeZoneOffset());
+               
gen.writeStringField(DashboardConfiguration.FIELD_NAME_TIMEZONE_NAME, 
dashboardConfiguration.getTimeZoneName());
+               
gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, 
dashboardConfiguration.getFlinkVersion());
+               
gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_REVISION, 
dashboardConfiguration.getFlinkRevision());
 
                gen.writeEndObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java
new file mode 100644
index 0000000..cfb3aaa
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.time.ZonedDateTime;
+import java.time.format.TextStyle;
+import java.util.Locale;
+import java.util.Objects;
+
+/**
+ * Response of the {@link DashboardConfigHandler} containing general 
configuration
+ * values such as the time zone and the refresh interval.
+ */
+public class DashboardConfiguration implements ResponseBody {
+
+       public static final String FIELD_NAME_REFRESH_INTERVAL = 
"refresh-interval";
+       public static final String FIELD_NAME_TIMEZONE_OFFSET = 
"timezone-offset";
+       public static final String FIELD_NAME_TIMEZONE_NAME = "timezone-name";
+       public static final String FIELD_NAME_FLINK_VERSION = "flink-version";
+       public static final String FIELD_NAME_FLINK_REVISION = "flink-revision";
+
+       @JsonProperty(FIELD_NAME_REFRESH_INTERVAL)
+       private final long refreshInterval;
+
+       @JsonProperty(FIELD_NAME_TIMEZONE_NAME)
+       private final String timeZoneName;
+
+       @JsonProperty(FIELD_NAME_TIMEZONE_OFFSET)
+       private final int timeZoneOffset;
+
+       @JsonProperty(FIELD_NAME_FLINK_VERSION)
+       private final String flinkVersion;
+
+       @JsonProperty(FIELD_NAME_FLINK_REVISION)
+       private final String flinkRevision;
+
+       @JsonCreator
+       public DashboardConfiguration(
+                       @JsonProperty(FIELD_NAME_REFRESH_INTERVAL) long 
refreshInterval,
+                       @JsonProperty(FIELD_NAME_TIMEZONE_NAME) String 
timeZoneName,
+                       @JsonProperty(FIELD_NAME_TIMEZONE_OFFSET) int 
timeZoneOffset,
+                       @JsonProperty(FIELD_NAME_FLINK_VERSION) String 
flinkVersion,
+                       @JsonProperty(FIELD_NAME_FLINK_REVISION) String 
flinkRevision) {
+               this.refreshInterval = refreshInterval;
+               this.timeZoneName = Preconditions.checkNotNull(timeZoneName);
+               this.timeZoneOffset = timeZoneOffset;
+               this.flinkVersion = Preconditions.checkNotNull(flinkVersion);
+               this.flinkRevision = Preconditions.checkNotNull(flinkRevision);
+       }
+
+       public long getRefreshInterval() {
+               return refreshInterval;
+       }
+
+       public int getTimeZoneOffset() {
+               return timeZoneOffset;
+       }
+
+       public String getTimeZoneName() {
+               return timeZoneName;
+       }
+
+       public String getFlinkVersion() {
+               return flinkVersion;
+       }
+
+       public String getFlinkRevision() {
+               return flinkRevision;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               DashboardConfiguration that = (DashboardConfiguration) o;
+               return refreshInterval == that.refreshInterval &&
+                       timeZoneOffset == that.timeZoneOffset &&
+                       Objects.equals(timeZoneName, that.timeZoneName) &&
+                       Objects.equals(flinkVersion, that.flinkVersion) &&
+                       Objects.equals(flinkRevision, that.flinkRevision);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(refreshInterval, timeZoneName, 
timeZoneOffset, flinkVersion, flinkRevision);
+       }
+
+       public static DashboardConfiguration from(long refreshInterval, 
ZonedDateTime zonedDateTime) {
+
+               final String flinkVersion = EnvironmentInformation.getVersion();
+
+               final EnvironmentInformation.RevisionInformation revision = 
EnvironmentInformation.getRevisionInformation();
+               final String flinkRevision;
+
+               if (revision != null) {
+                       flinkRevision = revision.commitId + " @ " + 
revision.commitDate;
+               } else {
+                       flinkRevision = "unknown revision";
+               }
+
+               return new DashboardConfiguration(
+                       refreshInterval,
+                       zonedDateTime.getZone().getDisplayName(TextStyle.FULL, 
Locale.getDefault()),
+                       // convert zone date time into offset in order to not 
do the day light saving adaptions wrt the offset
+                       
zonedDateTime.toOffsetDateTime().getOffset().getTotalSeconds() * 1000,
+                       flinkVersion,
+                       flinkRevision);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java
new file mode 100644
index 0000000..f001afc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Status overview message including the current Flink version and commit id.
+ */
+public class StatusOverviewWithVersion extends StatusOverview implements 
ResponseBody {
+
+       private static final long serialVersionUID = 5000058311783413216L;
+
+       public static final String FIELD_NAME_VERSION = "flink-version";
+       public static final String FIELD_NAME_COMMIT = "flink-commit";
+
+       @JsonProperty(FIELD_NAME_VERSION)
+       private final String version;
+
+       @JsonProperty(FIELD_NAME_COMMIT)
+       private final String commitId;
+
+       @JsonCreator
+       public StatusOverviewWithVersion(
+                       @JsonProperty(FIELD_NAME_TASKMANAGERS) int 
numTaskManagersConnected,
+                       @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
+                       @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int 
numSlotsAvailable,
+                       @JsonProperty(FIELD_NAME_JOBS_RUNNING) int 
numJobsRunningOrPending,
+                       @JsonProperty(FIELD_NAME_JOBS_FINISHED) int 
numJobsFinished,
+                       @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int 
numJobsCancelled,
+                       @JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed,
+                       @JsonProperty(FIELD_NAME_VERSION) String version,
+                       @JsonProperty(FIELD_NAME_COMMIT) String commitId) {
+               super(
+                       numTaskManagersConnected,
+                       numSlotsTotal,
+                       numSlotsAvailable,
+                       numJobsRunningOrPending,
+                       numJobsFinished,
+                       numJobsCancelled,
+                       numJobsFailed);
+
+               this.version = Preconditions.checkNotNull(version);
+               this.commitId = Preconditions.checkNotNull(commitId);
+       }
+
+       public StatusOverviewWithVersion(
+                       int numTaskManagersConnected,
+                       int numSlotsTotal,
+                       int numSlotsAvailable,
+                       JobsOverview jobs1,
+                       JobsOverview jobs2,
+                       String version,
+                       String commitId) {
+               super(numTaskManagersConnected, numSlotsTotal, 
numSlotsAvailable, jobs1, jobs2);
+
+               this.version = Preconditions.checkNotNull(version);
+               this.commitId = Preconditions.checkNotNull(commitId);
+       }
+
+       public static StatusOverviewWithVersion 
fromStatusOverview(StatusOverview statusOverview, String version, String 
commitId) {
+               return new StatusOverviewWithVersion(
+                       statusOverview.getNumTaskManagersConnected(),
+                       statusOverview.getNumSlotsTotal(),
+                       statusOverview.getNumSlotsAvailable(),
+                       statusOverview.getNumJobsRunningOrPending(),
+                       statusOverview.getNumJobsFinished(),
+                       statusOverview.getNumJobsCancelled(),
+                       statusOverview.getNumJobsFailed(),
+                       version,
+                       commitId);
+       }
+
+       public String getVersion() {
+               return version;
+       }
+
+       public String getCommitId() {
+               return commitId;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               if (!super.equals(o)) {
+                       return false;
+               }
+
+               StatusOverviewWithVersion that = (StatusOverviewWithVersion) o;
+
+               return Objects.equals(version, that.getVersion()) && 
Objects.equals(commitId, that.getCommitId());
+       }
+
+       @Override
+       public int hashCode() {
+               int result = super.hashCode();
+               result = 31 * result + (version != null ? version.hashCode() : 
0);
+               result = 31 * result + (commitId != null ? commitId.hashCode() 
: 0);
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
index f0f98ec..887ce2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.rest.messages;
 
-import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java
new file mode 100644
index 0000000..cc03b7b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link DashboardConfigHandler}.
+ */
+public final class DashboardConfigurationHeaders implements 
MessageHeaders<EmptyRequestBody, DashboardConfiguration, 
EmptyMessageParameters> {
+
+       private static final DashboardConfigurationHeaders INSTANCE = new 
DashboardConfigurationHeaders();
+
+       // make the constructor private since we want it to be a singleton
+       private DashboardConfigurationHeaders() {}
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.GET;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return DashboardConfigHandler.DASHBOARD_CONFIG_REST_PATH;
+       }
+
+       @Override
+       public Class<DashboardConfiguration> getResponseClass() {
+               return DashboardConfiguration.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public EmptyMessageParameters getUnresolvedMessageParameters() {
+               return EmptyMessageParameters.getInstance();
+       }
+
+       public static DashboardConfigurationHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 73cf063..e815a74 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -24,7 +24,6 @@ import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.curator.test.TestingServer;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java
deleted file mode 100644
index d69049e..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.util.TestLogger;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for the {@link StatusOverviewWithVersion}.
- */
-public class StatusOverviewWithVersionTest extends TestLogger {
-
-       /**
-        * Tests that we can marshal and unmarshal StatusOverviewWithVersion.
-        */
-       @Test
-       public void testJsonMarshalling() throws JsonProcessingException {
-               final StatusOverviewWithVersion expected = new 
StatusOverviewWithVersion(
-                       1,
-                       3,
-                       3,
-                       7,
-                       4,
-                       2,
-                       0,
-                       "version",
-                       "commit");
-
-               ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
-
-               JsonNode json = objectMapper.valueToTree(expected);
-
-               final StatusOverviewWithVersion unmarshalled = 
objectMapper.treeToValue(json, StatusOverviewWithVersion.class);
-
-               assertEquals(expected, unmarshalled);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
index 06a99fe..73d9157 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
@@ -19,19 +19,20 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.runtime.concurrent.Executors;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
-import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.TestLogger;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.TimeZone;
+import java.time.ZonedDateTime;
 
 /**
  * Tests for the DashboardConfigHandler.
  */
-public class DashboardConfigHandlerTest {
+public class DashboardConfigHandlerTest extends TestLogger {
        @Test
        public void testGetPaths() {
                DashboardConfigHandler handler = new 
DashboardConfigHandler(Executors.directExecutor(), 10000L);
@@ -43,17 +44,18 @@ public class DashboardConfigHandlerTest {
        @Test
        public void testJsonGeneration() throws Exception {
                long refreshInterval = 12345;
-               TimeZone timeZone = TimeZone.getDefault();
-               EnvironmentInformation.RevisionInformation revision = 
EnvironmentInformation.getRevisionInformation();
+               final ZonedDateTime zonedDateTime = ZonedDateTime.now();
 
-               String json = 
DashboardConfigHandler.createConfigJson(refreshInterval);
+               final DashboardConfiguration dashboardConfiguration = 
DashboardConfiguration.from(refreshInterval, zonedDateTime);
+
+               String json = 
DashboardConfigHandler.createConfigJson(dashboardConfiguration);
 
                JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
 
                Assert.assertEquals(refreshInterval, 
result.get("refresh-interval").asLong());
-               Assert.assertEquals(timeZone.getDisplayName(), 
result.get("timezone-name").asText());
-               Assert.assertEquals(timeZone.getRawOffset(), 
result.get("timezone-offset").asLong());
-               Assert.assertEquals(EnvironmentInformation.getVersion(), 
result.get("flink-version").asText());
-               Assert.assertEquals(revision.commitId + " @ " + 
revision.commitDate, result.get("flink-revision").asText());
+               Assert.assertEquals(dashboardConfiguration.getTimeZoneName(), 
result.get("timezone-name").asText());
+               Assert.assertEquals(dashboardConfiguration.getTimeZoneOffset(), 
result.get("timezone-offset").asInt());
+               Assert.assertEquals(dashboardConfiguration.getFlinkVersion(), 
result.get("flink-version").asText());
+               Assert.assertEquals(dashboardConfiguration.getFlinkRevision(), 
result.get("flink-revision").asText());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java
new file mode 100644
index 0000000..9a9046b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link DashboardConfiguration}.
+ */
+public class DashboardConfigurationTest extends TestLogger {
+
+       /**
+        * Tests that we can marshal and unmarshal {@link 
DashboardConfiguration} objects.
+        */
+       @Test
+       public void testJsonMarshalling() throws JsonProcessingException {
+               final DashboardConfiguration expected = new 
DashboardConfiguration(
+                       1L,
+                       "foobar",
+                       42,
+                       "version",
+                       "revision");
+
+               final ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
+
+               JsonNode marshaled = objectMapper.valueToTree(expected);
+
+               final DashboardConfiguration unmarshaled = 
objectMapper.treeToValue(marshaled, DashboardConfiguration.class);
+
+               assertEquals(expected, unmarshaled);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java
new file mode 100644
index 0000000..a1bbc9a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link StatusOverviewWithVersion}.
+ */
+public class StatusOverviewWithVersionTest extends TestLogger {
+
+       /**
+        * Tests that we can marshal and unmarshal StatusOverviewWithVersion.
+        */
+       @Test
+       public void testJsonMarshalling() throws JsonProcessingException {
+               final StatusOverviewWithVersion expected = new 
StatusOverviewWithVersion(
+                       1,
+                       3,
+                       3,
+                       7,
+                       4,
+                       2,
+                       0,
+                       "version",
+                       "commit");
+
+               ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
+
+               JsonNode json = objectMapper.valueToTree(expected);
+
+               final StatusOverviewWithVersion unmarshalled = 
objectMapper.treeToValue(json, StatusOverviewWithVersion.class);
+
+               assertEquals(expected, unmarshalled);
+       }
+}

Reply via email to