Repository: flink Updated Branches: refs/heads/master dbabdb1cc -> c6243b8b1
[FLINK-7535] Port DashboardConfigHandler to new REST endpoint Lets DashboardConfigHandler implement the LegacyRestHandler. Moreover, this commit defines the appropriate DashboardConfigurationHeaders. The DispatcherRestEndpoint registers the DashboardConfigHandler. This closes #4604. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6243b8b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6243b8b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6243b8b Branch: refs/heads/master Commit: c6243b8b1de6117623d3c4255f47f062d10c4602 Parents: dbabdb1 Author: Till Rohrmann <[email protected]> Authored: Mon Aug 21 15:11:08 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Sep 21 09:30:50 2017 +0200 ---------------------------------------------------------------------- .../webmonitor/history/HistoryServer.java | 4 +- .../dispatcher/DispatcherRestEndpoint.java | 35 +++-- .../entrypoint/SessionClusterEntrypoint.java | 9 +- .../webmonitor/StatusOverviewWithVersion.java | 128 ----------------- .../rest/handler/RestHandlerConfiguration.java | 68 +++++++++ .../handler/legacy/ClusterOverviewHandler.java | 2 +- .../handler/legacy/DashboardConfigHandler.java | 45 +++--- .../legacy/messages/DashboardConfiguration.java | 137 +++++++++++++++++++ .../messages/StatusOverviewWithVersion.java | 130 ++++++++++++++++++ .../rest/messages/ClusterOverviewHeaders.java | 2 +- .../messages/DashboardConfigurationHeaders.java | 70 ++++++++++ .../ZooKeeperLeaderElectionTest.java | 1 - .../StatusOverviewWithVersionTest.java | 60 -------- .../legacy/DashboardConfigHandlerTest.java | 22 +-- .../messages/DashboardConfigurationTest.java | 56 ++++++++ .../messages/StatusOverviewWithVersionTest.java | 60 ++++++++ 16 files changed, 592 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 01228d5..f9aea22 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -28,6 +28,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.history.FsJobArchivist; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; +import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; @@ -47,6 +48,7 @@ import java.io.FileWriter; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.nio.file.Files; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -289,7 +291,7 @@ public class HistoryServer { private void createDashboardConfigFile() throws IOException { try (FileWriter fw = createOrGetFile(webDir, "config")) { - fw.write(DashboardConfigHandler.createConfigJson(webRefreshIntervalMillis)); + fw.write(DashboardConfigHandler.createConfigJson(DashboardConfiguration.from(webRefreshIntervalMillis, ZonedDateTime.now()))); fw.flush(); } catch (IOException ioe) { LOG.error("Failed to write config file."); http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index 1f64c67..6054a7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -20,15 +20,19 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion; import org.apache.flink.runtime.rest.RestServerEndpoint; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter; +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; +import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification; +import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration; +import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion; import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; +import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; @@ -51,26 +55,25 @@ import java.util.concurrent.Executor; public class DispatcherRestEndpoint extends RestServerEndpoint { private final GatewayRetriever<DispatcherGateway> leaderRetriever; - private final Time timeout; - private final File tmpDir; + private final RestHandlerConfiguration restConfiguration; private final Executor executor; public DispatcherRestEndpoint( RestServerEndpointConfiguration configuration, GatewayRetriever<DispatcherGateway> leaderRetriever, - Time timeout, - File tmpDir, + RestHandlerConfiguration restConfiguration, Executor executor) { super(configuration); this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever); - this.timeout = Preconditions.checkNotNull(timeout); - this.tmpDir = Preconditions.checkNotNull(tmpDir); + this.restConfiguration = Preconditions.checkNotNull(restConfiguration); this.executor = Preconditions.checkNotNull(executor); } @Override protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) { - ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(2); + ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(3); + + final Time timeout = restConfiguration.getTimeout(); LegacyRestHandlerAdapter<DispatcherGateway, StatusOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = new LegacyRestHandlerAdapter<>( restAddressFuture, @@ -81,7 +84,16 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { executor, timeout)); - handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler)); + LegacyRestHandlerAdapter<DispatcherGateway, DashboardConfiguration, EmptyMessageParameters> dashboardConfigurationHandler = new LegacyRestHandlerAdapter<>( + restAddressFuture, + leaderRetriever, + timeout, + DashboardConfigurationHeaders.getInstance(), + new DashboardConfigHandler( + executor, + restConfiguration.getRefreshInterval())); + + final File tmpDir = restConfiguration.getTmpDir(); Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent; @@ -96,6 +108,9 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { optWebContent = Optional.empty(); } + handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler)); + handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler)); + optWebContent.ifPresent( webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent))); @@ -106,6 +121,8 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { public void shutdown(Time timeout) { super.shutdown(timeout); + final File tmpDir = restConfiguration.getTmpDir(); + try { log.info("Removing cache directory {}", tmpDir); FileUtils.deleteDirectory(tmpDir); http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java index 9f4e04a..e394854 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.entrypoint; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.dispatcher.Dispatcher; @@ -34,6 +33,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; @@ -41,7 +41,6 @@ import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; -import java.io.File; import java.util.Optional; import java.util.concurrent.Executor; @@ -157,14 +156,10 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever, Executor executor) throws Exception { - Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT)); - File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR)); - return new DispatcherRestEndpoint( RestServerEndpointConfiguration.fromConfiguration(configuration), dispatcherGatewayRetriever, - timeout, - tmpDir, + RestHandlerConfiguration.fromConfiguration(configuration), executor); } http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java deleted file mode 100644 index 9029537..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.messages.webmonitor; - -import org.apache.flink.runtime.rest.messages.ResponseBody; -import org.apache.flink.util.Preconditions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.Objects; - -/** - * Status overview message including the current Flink version and commit id. - */ -public class StatusOverviewWithVersion extends StatusOverview implements ResponseBody { - - private static final long serialVersionUID = 5000058311783413216L; - - public static final String FIELD_NAME_VERSION = "flink-version"; - public static final String FIELD_NAME_COMMIT = "flink-commit"; - - @JsonProperty(FIELD_NAME_VERSION) - private final String version; - - @JsonProperty(FIELD_NAME_COMMIT) - private final String commitId; - - @JsonCreator - public StatusOverviewWithVersion( - @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected, - @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal, - @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable, - @JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending, - @JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished, - @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled, - @JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed, - @JsonProperty(FIELD_NAME_VERSION) String version, - @JsonProperty(FIELD_NAME_COMMIT) String commitId) { - super( - numTaskManagersConnected, - numSlotsTotal, - numSlotsAvailable, - numJobsRunningOrPending, - numJobsFinished, - numJobsCancelled, - numJobsFailed); - - this.version = Preconditions.checkNotNull(version); - this.commitId = Preconditions.checkNotNull(commitId); - } - - public StatusOverviewWithVersion( - int numTaskManagersConnected, - int numSlotsTotal, - int numSlotsAvailable, - JobsOverview jobs1, - JobsOverview jobs2, - String version, - String commitId) { - super(numTaskManagersConnected, numSlotsTotal, numSlotsAvailable, jobs1, jobs2); - - this.version = Preconditions.checkNotNull(version); - this.commitId = Preconditions.checkNotNull(commitId); - } - - public static StatusOverviewWithVersion fromStatusOverview(StatusOverview statusOverview, String version, String commitId) { - return new StatusOverviewWithVersion( - statusOverview.getNumTaskManagersConnected(), - statusOverview.getNumSlotsTotal(), - statusOverview.getNumSlotsAvailable(), - statusOverview.getNumJobsRunningOrPending(), - statusOverview.getNumJobsFinished(), - statusOverview.getNumJobsCancelled(), - statusOverview.getNumJobsFailed(), - version, - commitId); - } - - public String getVersion() { - return version; - } - - public String getCommitId() { - return commitId; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - - StatusOverviewWithVersion that = (StatusOverviewWithVersion) o; - - return Objects.equals(version, that.getVersion()) && Objects.equals(commitId, that.getCommitId()); - } - - @Override - public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (version != null ? version.hashCode() : 0); - result = 31 * result + (commitId != null ? commitId.hashCode() : 0); - return result; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java new file mode 100644 index 0000000..9220bd9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.util.Preconditions; + +import java.io.File; + +/** + * Configuration object containing values for the rest handler configuration. + */ +public class RestHandlerConfiguration { + + private final long refreshInterval; + + private final Time timeout; + + private final File tmpDir; + + public RestHandlerConfiguration(long refreshInterval, Time timeout, File tmpDir) { + Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should be larger than 0."); + this.refreshInterval = refreshInterval; + + this.timeout = Preconditions.checkNotNull(timeout); + this.tmpDir = Preconditions.checkNotNull(tmpDir); + } + + public long getRefreshInterval() { + return refreshInterval; + } + + public Time getTimeout() { + return timeout; + } + + public File getTmpDir() { + return tmpDir; + } + + public static RestHandlerConfiguration fromConfiguration(Configuration configuration) { + final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL); + + final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT)); + + final File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR)); + + return new RestHandlerConfiguration(refreshInterval, timeout, tmpDir); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java index 9340fa2..480c9e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java @@ -25,9 +25,9 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.webmonitor.JobsOverview; import org.apache.flink.runtime.messages.webmonitor.StatusOverview; -import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.LegacyRestHandler; +import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.util.EnvironmentInformation; http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java index e8854f4..0cef5fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java @@ -18,15 +18,20 @@ package org.apache.flink.runtime.rest.handler.legacy; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.LegacyRestHandler; +import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import com.fasterxml.jackson.core.JsonGenerator; import java.io.IOException; import java.io.StringWriter; +import java.time.ZonedDateTime; import java.util.Map; -import java.util.TimeZone; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -35,16 +40,21 @@ import java.util.concurrent.Executor; * against this web server should behave. It defines for example the refresh interval, * and time zone of the server timestamps. */ -public class DashboardConfigHandler extends AbstractJsonRequestHandler { +public class DashboardConfigHandler extends AbstractJsonRequestHandler implements LegacyRestHandler<DispatcherGateway, DashboardConfiguration, EmptyMessageParameters> { - private static final String DASHBOARD_CONFIG_REST_PATH = "/config"; + public static final String DASHBOARD_CONFIG_REST_PATH = "/config"; private final String configString; + private final DashboardConfiguration dashboardConfiguration; + public DashboardConfigHandler(Executor executor, long refreshInterval) { super(executor); + + dashboardConfiguration = DashboardConfiguration.from(refreshInterval, ZonedDateTime.now()); + try { - this.configString = createConfigJson(refreshInterval); + this.configString = createConfigJson(dashboardConfiguration); } catch (Exception e) { // should never happen @@ -58,28 +68,25 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler { } @Override + public CompletableFuture<DashboardConfiguration> handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, DispatcherGateway gateway) { + return CompletableFuture.completedFuture(dashboardConfiguration); + } + + @Override public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { return CompletableFuture.completedFuture(configString); } - public static String createConfigJson(long refreshInterval) throws IOException { + public static String createConfigJson(DashboardConfiguration dashboardConfiguration) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - TimeZone timeZone = TimeZone.getDefault(); - String timeZoneName = timeZone.getDisplayName(); - long timeZoneOffset = timeZone.getRawOffset(); - gen.writeStartObject(); - gen.writeNumberField("refresh-interval", refreshInterval); - gen.writeNumberField("timezone-offset", timeZoneOffset); - gen.writeStringField("timezone-name", timeZoneName); - gen.writeStringField("flink-version", EnvironmentInformation.getVersion()); - - EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation(); - if (revision != null) { - gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate); - } + gen.writeNumberField(DashboardConfiguration.FIELD_NAME_REFRESH_INTERVAL, dashboardConfiguration.getRefreshInterval()); + gen.writeNumberField(DashboardConfiguration.FIELD_NAME_TIMEZONE_OFFSET, dashboardConfiguration.getTimeZoneOffset()); + gen.writeStringField(DashboardConfiguration.FIELD_NAME_TIMEZONE_NAME, dashboardConfiguration.getTimeZoneName()); + gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, dashboardConfiguration.getFlinkVersion()); + gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision()); gen.writeEndObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java new file mode 100644 index 0000000..cfb3aaa --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.time.ZonedDateTime; +import java.time.format.TextStyle; +import java.util.Locale; +import java.util.Objects; + +/** + * Response of the {@link DashboardConfigHandler} containing general configuration + * values such as the time zone and the refresh interval. + */ +public class DashboardConfiguration implements ResponseBody { + + public static final String FIELD_NAME_REFRESH_INTERVAL = "refresh-interval"; + public static final String FIELD_NAME_TIMEZONE_OFFSET = "timezone-offset"; + public static final String FIELD_NAME_TIMEZONE_NAME = "timezone-name"; + public static final String FIELD_NAME_FLINK_VERSION = "flink-version"; + public static final String FIELD_NAME_FLINK_REVISION = "flink-revision"; + + @JsonProperty(FIELD_NAME_REFRESH_INTERVAL) + private final long refreshInterval; + + @JsonProperty(FIELD_NAME_TIMEZONE_NAME) + private final String timeZoneName; + + @JsonProperty(FIELD_NAME_TIMEZONE_OFFSET) + private final int timeZoneOffset; + + @JsonProperty(FIELD_NAME_FLINK_VERSION) + private final String flinkVersion; + + @JsonProperty(FIELD_NAME_FLINK_REVISION) + private final String flinkRevision; + + @JsonCreator + public DashboardConfiguration( + @JsonProperty(FIELD_NAME_REFRESH_INTERVAL) long refreshInterval, + @JsonProperty(FIELD_NAME_TIMEZONE_NAME) String timeZoneName, + @JsonProperty(FIELD_NAME_TIMEZONE_OFFSET) int timeZoneOffset, + @JsonProperty(FIELD_NAME_FLINK_VERSION) String flinkVersion, + @JsonProperty(FIELD_NAME_FLINK_REVISION) String flinkRevision) { + this.refreshInterval = refreshInterval; + this.timeZoneName = Preconditions.checkNotNull(timeZoneName); + this.timeZoneOffset = timeZoneOffset; + this.flinkVersion = Preconditions.checkNotNull(flinkVersion); + this.flinkRevision = Preconditions.checkNotNull(flinkRevision); + } + + public long getRefreshInterval() { + return refreshInterval; + } + + public int getTimeZoneOffset() { + return timeZoneOffset; + } + + public String getTimeZoneName() { + return timeZoneName; + } + + public String getFlinkVersion() { + return flinkVersion; + } + + public String getFlinkRevision() { + return flinkRevision; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DashboardConfiguration that = (DashboardConfiguration) o; + return refreshInterval == that.refreshInterval && + timeZoneOffset == that.timeZoneOffset && + Objects.equals(timeZoneName, that.timeZoneName) && + Objects.equals(flinkVersion, that.flinkVersion) && + Objects.equals(flinkRevision, that.flinkRevision); + } + + @Override + public int hashCode() { + return Objects.hash(refreshInterval, timeZoneName, timeZoneOffset, flinkVersion, flinkRevision); + } + + public static DashboardConfiguration from(long refreshInterval, ZonedDateTime zonedDateTime) { + + final String flinkVersion = EnvironmentInformation.getVersion(); + + final EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation(); + final String flinkRevision; + + if (revision != null) { + flinkRevision = revision.commitId + " @ " + revision.commitDate; + } else { + flinkRevision = "unknown revision"; + } + + return new DashboardConfiguration( + refreshInterval, + zonedDateTime.getZone().getDisplayName(TextStyle.FULL, Locale.getDefault()), + // convert zone date time into offset in order to not do the day light saving adaptions wrt the offset + zonedDateTime.toOffsetDateTime().getOffset().getTotalSeconds() * 1000, + flinkVersion, + flinkRevision); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java new file mode 100644 index 0000000..f001afc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.messages.webmonitor.StatusOverview; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Status overview message including the current Flink version and commit id. + */ +public class StatusOverviewWithVersion extends StatusOverview implements ResponseBody { + + private static final long serialVersionUID = 5000058311783413216L; + + public static final String FIELD_NAME_VERSION = "flink-version"; + public static final String FIELD_NAME_COMMIT = "flink-commit"; + + @JsonProperty(FIELD_NAME_VERSION) + private final String version; + + @JsonProperty(FIELD_NAME_COMMIT) + private final String commitId; + + @JsonCreator + public StatusOverviewWithVersion( + @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected, + @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal, + @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable, + @JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending, + @JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished, + @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled, + @JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed, + @JsonProperty(FIELD_NAME_VERSION) String version, + @JsonProperty(FIELD_NAME_COMMIT) String commitId) { + super( + numTaskManagersConnected, + numSlotsTotal, + numSlotsAvailable, + numJobsRunningOrPending, + numJobsFinished, + numJobsCancelled, + numJobsFailed); + + this.version = Preconditions.checkNotNull(version); + this.commitId = Preconditions.checkNotNull(commitId); + } + + public StatusOverviewWithVersion( + int numTaskManagersConnected, + int numSlotsTotal, + int numSlotsAvailable, + JobsOverview jobs1, + JobsOverview jobs2, + String version, + String commitId) { + super(numTaskManagersConnected, numSlotsTotal, numSlotsAvailable, jobs1, jobs2); + + this.version = Preconditions.checkNotNull(version); + this.commitId = Preconditions.checkNotNull(commitId); + } + + public static StatusOverviewWithVersion fromStatusOverview(StatusOverview statusOverview, String version, String commitId) { + return new StatusOverviewWithVersion( + statusOverview.getNumTaskManagersConnected(), + statusOverview.getNumSlotsTotal(), + statusOverview.getNumSlotsAvailable(), + statusOverview.getNumJobsRunningOrPending(), + statusOverview.getNumJobsFinished(), + statusOverview.getNumJobsCancelled(), + statusOverview.getNumJobsFailed(), + version, + commitId); + } + + public String getVersion() { + return version; + } + + public String getCommitId() { + return commitId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + StatusOverviewWithVersion that = (StatusOverviewWithVersion) o; + + return Objects.equals(version, that.getVersion()) && Objects.equals(commitId, that.getCommitId()); + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (version != null ? version.hashCode() : 0); + result = 31 * result + (commitId != null ? commitId.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java index f0f98ec..887ce2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java @@ -18,9 +18,9 @@ package org.apache.flink.runtime.rest.messages; -import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion; import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; +import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java new file mode 100644 index 0000000..cc03b7b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; +import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link DashboardConfigHandler}. + */ +public final class DashboardConfigurationHeaders implements MessageHeaders<EmptyRequestBody, DashboardConfiguration, EmptyMessageParameters> { + + private static final DashboardConfigurationHeaders INSTANCE = new DashboardConfigurationHeaders(); + + // make the constructor private since we want it to be a singleton + private DashboardConfigurationHeaders() {} + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return DashboardConfigHandler.DASHBOARD_CONFIG_REST_PATH; + } + + @Override + public Class<DashboardConfiguration> getResponseClass() { + return DashboardConfiguration.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public EmptyMessageParameters getUnresolvedMessageParameters() { + return EmptyMessageParameters.getInstance(); + } + + public static DashboardConfigurationHeaders getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java index 73cf063..e815a74 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java @@ -24,7 +24,6 @@ import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.test.TestingServer; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService; http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java deleted file mode 100644 index d69049e..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.messages.webmonitor; - -import org.apache.flink.runtime.rest.util.RestMapperUtils; -import org.apache.flink.util.TestLogger; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -/** - * Tests for the {@link StatusOverviewWithVersion}. - */ -public class StatusOverviewWithVersionTest extends TestLogger { - - /** - * Tests that we can marshal and unmarshal StatusOverviewWithVersion. - */ - @Test - public void testJsonMarshalling() throws JsonProcessingException { - final StatusOverviewWithVersion expected = new StatusOverviewWithVersion( - 1, - 3, - 3, - 7, - 4, - 2, - 0, - "version", - "commit"); - - ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); - - JsonNode json = objectMapper.valueToTree(expected); - - final StatusOverviewWithVersion unmarshalled = objectMapper.treeToValue(json, StatusOverviewWithVersion.class); - - assertEquals(expected, unmarshalled); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java index 06a99fe..73d9157 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java @@ -19,19 +19,20 @@ package org.apache.flink.runtime.rest.handler.legacy; import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; -import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.TestLogger; import com.fasterxml.jackson.databind.JsonNode; import org.junit.Assert; import org.junit.Test; -import java.util.TimeZone; +import java.time.ZonedDateTime; /** * Tests for the DashboardConfigHandler. */ -public class DashboardConfigHandlerTest { +public class DashboardConfigHandlerTest extends TestLogger { @Test public void testGetPaths() { DashboardConfigHandler handler = new DashboardConfigHandler(Executors.directExecutor(), 10000L); @@ -43,17 +44,18 @@ public class DashboardConfigHandlerTest { @Test public void testJsonGeneration() throws Exception { long refreshInterval = 12345; - TimeZone timeZone = TimeZone.getDefault(); - EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation(); + final ZonedDateTime zonedDateTime = ZonedDateTime.now(); - String json = DashboardConfigHandler.createConfigJson(refreshInterval); + final DashboardConfiguration dashboardConfiguration = DashboardConfiguration.from(refreshInterval, zonedDateTime); + + String json = DashboardConfigHandler.createConfigJson(dashboardConfiguration); JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); Assert.assertEquals(refreshInterval, result.get("refresh-interval").asLong()); - Assert.assertEquals(timeZone.getDisplayName(), result.get("timezone-name").asText()); - Assert.assertEquals(timeZone.getRawOffset(), result.get("timezone-offset").asLong()); - Assert.assertEquals(EnvironmentInformation.getVersion(), result.get("flink-version").asText()); - Assert.assertEquals(revision.commitId + " @ " + revision.commitDate, result.get("flink-revision").asText()); + Assert.assertEquals(dashboardConfiguration.getTimeZoneName(), result.get("timezone-name").asText()); + Assert.assertEquals(dashboardConfiguration.getTimeZoneOffset(), result.get("timezone-offset").asInt()); + Assert.assertEquals(dashboardConfiguration.getFlinkVersion(), result.get("flink-version").asText()); + Assert.assertEquals(dashboardConfiguration.getFlinkRevision(), result.get("flink-revision").asText()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java new file mode 100644 index 0000000..9a9046b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.TestLogger; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link DashboardConfiguration}. + */ +public class DashboardConfigurationTest extends TestLogger { + + /** + * Tests that we can marshal and unmarshal {@link DashboardConfiguration} objects. + */ + @Test + public void testJsonMarshalling() throws JsonProcessingException { + final DashboardConfiguration expected = new DashboardConfiguration( + 1L, + "foobar", + 42, + "version", + "revision"); + + final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + JsonNode marshaled = objectMapper.valueToTree(expected); + + final DashboardConfiguration unmarshaled = objectMapper.treeToValue(marshaled, DashboardConfiguration.class); + + assertEquals(expected, unmarshaled); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java new file mode 100644 index 0000000..a1bbc9a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.TestLogger; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link StatusOverviewWithVersion}. + */ +public class StatusOverviewWithVersionTest extends TestLogger { + + /** + * Tests that we can marshal and unmarshal StatusOverviewWithVersion. + */ + @Test + public void testJsonMarshalling() throws JsonProcessingException { + final StatusOverviewWithVersion expected = new StatusOverviewWithVersion( + 1, + 3, + 3, + 7, + 4, + 2, + 0, + "version", + "commit"); + + ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + JsonNode json = objectMapper.valueToTree(expected); + + final StatusOverviewWithVersion unmarshalled = objectMapper.treeToValue(json, StatusOverviewWithVersion.class); + + assertEquals(expected, unmarshalled); + } +}
