http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java new file mode 100644 index 0000000..da115ee --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.files; + +/***************************************************************************** + * This code is based on the "HttpStaticFileServerHandler" from the + * Netty project's HTTP server example. + * + * See http://netty.io and + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java + *****************************************************************************/ + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.RedirectHandler; +import org.apache.flink.runtime.rest.handler.util.MimeTypes; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile; +import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Files; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.Locale; +import java.util.TimeZone; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.DATE; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Simple file server handler that serves requests to web frontend's static files, such as + * HTML, CSS, or JS files. + * + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server + * example.</p> + */ +@ChannelHandler.Sharable +public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectHandler<T> { + + /** Timezone in which this server answers its "if-modified" requests. */ + private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT"); + + /** Date format for HTTP. */ + public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz"; + + /** Be default, we allow files to be cached for 5 minutes. */ + private static final int HTTP_CACHE_SECONDS = 300; + + // ------------------------------------------------------------------------ + + /** The path in which the static documents are. */ + private final File rootPath; + + public StaticFileServerHandler( + GatewayRetriever<T> retriever, + CompletableFuture<String> localJobManagerAddressFuture, + Time timeout, + File rootPath) throws IOException { + + super(localJobManagerAddressFuture, retriever, timeout); + + this.rootPath = checkNotNull(rootPath).getCanonicalFile(); + } + + // ------------------------------------------------------------------------ + // Responses to requests + // ------------------------------------------------------------------------ + + @Override + protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception { + final HttpRequest request = routed.request(); + final String requestPath; + + // make sure we request the "index.html" in case there is a directory request + if (routed.path().endsWith("/")) { + requestPath = routed.path() + "index.html"; + } + // in case the files being accessed are logs or stdout files, find appropriate paths. + else if (routed.path().equals("/jobmanager/log") || routed.path().equals("/jobmanager/stdout")) { + requestPath = ""; + } else { + requestPath = routed.path(); + } + + respondToRequest(channelHandlerContext, request, requestPath); + } + + /** + * Response when running with leading JobManager. + */ + private void respondToRequest(ChannelHandlerContext ctx, HttpRequest request, String requestPath) + throws IOException, ParseException, URISyntaxException { + + // convert to absolute path + final File file = new File(rootPath, requestPath); + + if (!file.exists()) { + // file does not exist. Try to load it with the classloader + ClassLoader cl = StaticFileServerHandler.class.getClassLoader(); + + try (InputStream resourceStream = cl.getResourceAsStream("web" + requestPath)) { + boolean success = false; + try { + if (resourceStream != null) { + URL root = cl.getResource("web"); + URL requested = cl.getResource("web" + requestPath); + + if (root != null && requested != null) { + URI rootURI = new URI(root.getPath()).normalize(); + URI requestedURI = new URI(requested.getPath()).normalize(); + + // Check that we don't load anything from outside of the + // expected scope. + if (!rootURI.relativize(requestedURI).equals(requestedURI)) { + logger.debug("Loading missing file from classloader: {}", requestPath); + // ensure that directory to file exists. + file.getParentFile().mkdirs(); + Files.copy(resourceStream, file.toPath()); + + success = true; + } + } + } + } catch (Throwable t) { + logger.error("error while responding", t); + } finally { + if (!success) { + logger.debug("Unable to load requested file {} from classloader", requestPath); + sendError(ctx, NOT_FOUND); + return; + } + } + } + } + + if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) { + sendError(ctx, NOT_FOUND); + return; + } + + if (!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) { + sendError(ctx, NOT_FOUND); + return; + } + + // cache validation + final String ifModifiedSince = request.headers().get(IF_MODIFIED_SINCE); + if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) { + SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); + Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince); + + // Only compare up to the second because the datetime format we send to the client + // does not have milliseconds + long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000; + long fileLastModifiedSeconds = file.lastModified() / 1000; + if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) { + if (logger.isDebugEnabled()) { + logger.debug("Responding 'NOT MODIFIED' for file '" + file.getAbsolutePath() + '\''); + } + + sendNotModified(ctx); + return; + } + } + + if (logger.isDebugEnabled()) { + logger.debug("Responding with file '" + file.getAbsolutePath() + '\''); + } + + // Don't need to close this manually. Netty's DefaultFileRegion will take care of it. + final RandomAccessFile raf; + try { + raf = new RandomAccessFile(file, "r"); + } + catch (FileNotFoundException e) { + sendError(ctx, NOT_FOUND); + return; + } + + try { + long fileLength = raf.length(); + + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + setContentTypeHeader(response, file); + + // since the log and out files are rapidly changing, we don't want to browser to cache them + if (!(requestPath.contains("log") || requestPath.contains("out"))) { + setDateAndCacheHeaders(response, file); + } + if (HttpHeaders.isKeepAlive(request)) { + response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } + HttpHeaders.setContentLength(response, fileLength); + + // write the initial line and the header. + ctx.write(response); + + // write the content. + ChannelFuture lastContentFuture; + if (ctx.pipeline().get(SslHandler.class) == null) { + ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise()); + lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + } else { + lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), + ctx.newProgressivePromise()); + // HttpChunkedInput will write the end marker (LastHttpContent) for us. + } + + // close the connection, if no keep-alive is needed + if (!HttpHeaders.isKeepAlive(request)) { + lastContentFuture.addListener(ChannelFutureListener.CLOSE); + } + } catch (Exception e) { + raf.close(); + logger.error("Failed to serve file.", e); + sendError(ctx, INTERNAL_SERVER_ERROR); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (ctx.channel().isActive()) { + logger.error("Caught exception", cause); + sendError(ctx, INTERNAL_SERVER_ERROR); + } + } + + // ------------------------------------------------------------------------ + // Utilities to encode headers and responses + // ------------------------------------------------------------------------ + + /** + * Writes a simple error response message. + * + * @param ctx The channel context to write the response to. + * @param status The response status. + */ + public static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { + FullHttpResponse response = new DefaultFullHttpResponse( + HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8)); + response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); + + // close the connection as soon as the error message is sent. + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } + + /** + * Send the "304 Not Modified" response. This response can be used when the + * file timestamp is the same as what the browser is sending up. + * + * @param ctx The channel context to write the response to. + */ + public static void sendNotModified(ChannelHandlerContext ctx) { + FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED); + setDateHeader(response); + + // close the connection as soon as the error message is sent. + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } + + /** + * Sets the "date" header for the HTTP response. + * + * @param response HTTP response + */ + public static void setDateHeader(FullHttpResponse response) { + SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); + dateFormatter.setTimeZone(GMT_TIMEZONE); + + Calendar time = new GregorianCalendar(); + response.headers().set(DATE, dateFormatter.format(time.getTime())); + } + + /** + * Sets the "date" and "cache" headers for the HTTP Response. + * + * @param response The HTTP response object. + * @param fileToCache File to extract the modification timestamp from. + */ + public static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) { + SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); + dateFormatter.setTimeZone(GMT_TIMEZONE); + + // date header + Calendar time = new GregorianCalendar(); + response.headers().set(DATE, dateFormatter.format(time.getTime())); + + // cache headers + time.add(Calendar.SECOND, HTTP_CACHE_SECONDS); + response.headers().set(EXPIRES, dateFormatter.format(time.getTime())); + response.headers().set(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS); + response.headers().set(LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified()))); + } + + /** + * Sets the content type header for the HTTP Response. + * + * @param response HTTP response + * @param file file to extract content type + */ + public static void setContentTypeHeader(HttpResponse response, File file) { + String mimeType = MimeTypes.getMimeTypeForFileName(file.getName()); + String mimeFinal = mimeType != null ? mimeType : MimeTypes.getDefaultMimeType(); + response.headers().set(CONTENT_TYPE, mimeFinal); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java new file mode 100644 index 0000000..315bdc2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import org.apache.flink.runtime.concurrent.FlinkFutureException; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler; +import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Abstract request handler that returns a list of all available metrics or the values for a set of metrics. + * + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code [ { "id" : "X" } ] } + * + * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. + * {@code /get?X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + */ +public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler { + private final MetricFetcher fetcher; + + public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) { + super(executor); + this.fetcher = Preconditions.checkNotNull(fetcher); + } + + @Override + public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) { + return CompletableFuture.supplyAsync( + () -> { + fetcher.update(); + String requestedMetricsList = queryParams.get("get"); + try { + return requestedMetricsList != null + ? getMetricsValues(pathParams, requestedMetricsList) + : getAvailableMetricsList(pathParams); + } catch (IOException e) { + throw new FlinkFutureException("Could not retrieve metrics.", e); + } + }, + executor); + + } + + /** + * Returns a Map containing the metrics belonging to the entity pointed to by the path parameters. + * + * @param pathParams REST path parameters + * @param metrics MetricStore containing all metrics + * @return Map containing metrics, or null if no metric exists + */ + protected abstract Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics); + + private String getMetricsValues(Map<String, String> pathParams, String requestedMetricsList) throws IOException { + if (requestedMetricsList.isEmpty()) { + /* + * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a + * request for which the "get" parameter is an empty string. + */ + return ""; + } + MetricStore metricStore = fetcher.getMetricStore(); + synchronized (metricStore) { + Map<String, String> metrics = getMapFor(pathParams, metricStore); + if (metrics == null) { + return ""; + } + String[] requestedMetrics = requestedMetricsList.split(","); + + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartArray(); + for (String requestedMetric : requestedMetrics) { + Object metricValue = metrics.get(requestedMetric); + if (metricValue != null) { + gen.writeStartObject(); + gen.writeStringField("id", requestedMetric); + gen.writeStringField("value", metricValue.toString()); + gen.writeEndObject(); + } + } + gen.writeEndArray(); + + gen.close(); + return writer.toString(); + } + } + + private String getAvailableMetricsList(Map<String, String> pathParams) throws IOException { + MetricStore metricStore = fetcher.getMetricStore(); + synchronized (metricStore) { + Map<String, String> metrics = getMapFor(pathParams, metricStore); + if (metrics == null) { + return ""; + } + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartArray(); + for (String m : metrics.keySet()) { + gen.writeStartObject(); + gen.writeStringField("id", m); + gen.writeEndObject(); + } + gen.writeEndArray(); + + gen.close(); + return writer.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java new file mode 100644 index 0000000..c568ee0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import java.util.Map; +import java.util.concurrent.Executor; + +/** + * Request handler that returns for the job manager a list of all available metrics or the values for a set of metrics. + * + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } + * + * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. + * {@code /get?X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + */ +public class JobManagerMetricsHandler extends AbstractMetricsHandler { + + private static final String JOBMANAGER_METRICS_REST_PATH = "/jobmanager/metrics"; + + public JobManagerMetricsHandler(Executor executor, MetricFetcher fetcher) { + super(executor, fetcher); + } + + @Override + public String[] getPaths() { + return new String[]{JOBMANAGER_METRICS_REST_PATH}; + } + + @Override + protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { + MetricStore.JobManagerMetricStore jobManager = metrics.getJobManagerMetricStore(); + if (jobManager == null) { + return null; + } else { + return jobManager.metrics; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java new file mode 100644 index 0000000..7341eb8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import java.util.Map; +import java.util.concurrent.Executor; + +/** + * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics. + * + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } + * + * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. + * {@code /get?X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + */ +public class JobMetricsHandler extends AbstractMetricsHandler { + public static final String PARAMETER_JOB_ID = "jobid"; + private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics"; + + public JobMetricsHandler(Executor executor, MetricFetcher fetcher) { + super(executor, fetcher); + } + + @Override + public String[] getPaths() { + return new String[]{JOB_METRICS_REST_PATH}; + } + + @Override + protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { + MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); + return job != null + ? job.metrics + : null; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java new file mode 100644 index 0000000..3a701ab --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import java.util.Map; +import java.util.concurrent.Executor; + +/** + * Request handler that returns for a given task a list of all available metrics or the values for a set of metrics. + * + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } + * + * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. + * {@code /get?X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + */ +public class JobVertexMetricsHandler extends AbstractMetricsHandler { + public static final String PARAMETER_VERTEX_ID = "vertexid"; + private static final String JOB_VERTEX_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/metrics"; + + public JobVertexMetricsHandler(Executor executor, MetricFetcher fetcher) { + super(executor, fetcher); + } + + @Override + public String[] getPaths() { + return new String[]{JOB_VERTEX_METRICS_REST_PATH}; + } + + @Override + protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { + MetricStore.TaskMetricStore task = metrics.getTaskMetricStore( + pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID), + pathParams.get(PARAMETER_VERTEX_ID)); + return task != null + ? task.metrics + : null; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java new file mode 100644 index 0000000..9f53808 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.metrics.dump.MetricDump; +import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer; + +/** + * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers. + * + * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since + * the last call has passed. + */ +public class MetricFetcher { + private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class); + + private final GatewayRetriever<JobManagerGateway> retriever; + private final MetricQueryServiceRetriever queryServiceRetriever; + private final Executor executor; + private final Time timeout; + + private final MetricStore metrics = new MetricStore(); + private final MetricDumpDeserializer deserializer = new MetricDumpDeserializer(); + + private long lastUpdateTime; + + public MetricFetcher( + GatewayRetriever<JobManagerGateway> retriever, + MetricQueryServiceRetriever queryServiceRetriever, + Executor executor, + Time timeout) { + this.retriever = Preconditions.checkNotNull(retriever); + this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever); + this.executor = Preconditions.checkNotNull(executor); + this.timeout = Preconditions.checkNotNull(timeout); + } + + /** + * Returns the MetricStore containing all stored metrics. + * + * @return MetricStore containing all stored metrics; + */ + public MetricStore getMetricStore() { + return metrics; + } + + /** + * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated. + */ + public void update() { + synchronized (this) { + long currentTime = System.currentTimeMillis(); + if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update + lastUpdateTime = currentTime; + fetchMetrics(); + } + } + } + + private void fetchMetrics() { + try { + Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow(); + if (optJobManagerGateway.isPresent()) { + final JobManagerGateway jobManagerGateway = optJobManagerGateway.get(); + + /** + * Remove all metrics that belong to a job that is not running and no longer archived. + */ + CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout); + + jobDetailsFuture.whenCompleteAsync( + (MultipleJobsDetails jobDetails, Throwable throwable) -> { + if (throwable != null) { + LOG.debug("Fetching of JobDetails failed.", throwable); + } else { + ArrayList<String> toRetain = new ArrayList<>(); + for (JobDetails job : jobDetails.getRunningJobs()) { + toRetain.add(job.getJobId().toString()); + } + for (JobDetails job : jobDetails.getFinishedJobs()) { + toRetain.add(job.getJobId().toString()); + } + synchronized (metrics) { + metrics.jobs.keySet().retainAll(toRetain); + } + } + }, + executor); + + String jobManagerPath = jobManagerGateway.getAddress(); + String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME; + + retrieveAndQueryMetrics(jmQueryServicePath); + + /** + * We first request the list of all registered task managers from the job manager, and then + * request the respective metric dump from each task manager. + * + * <p>All stored metrics that do not belong to a registered task manager will be removed. + */ + CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout); + + taskManagersFuture.whenCompleteAsync( + (Collection<Instance> taskManagers, Throwable throwable) -> { + if (throwable != null) { + LOG.debug("Fetching list of registered TaskManagers failed.", throwable); + } else { + List<String> activeTaskManagers = taskManagers.stream().map( + taskManagerInstance -> { + final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress(); + final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString(); + + retrieveAndQueryMetrics(tmQueryServicePath); + + return taskManagerInstance.getId().toString(); + }).collect(Collectors.toList()); + + synchronized (metrics) { + metrics.taskManagers.keySet().retainAll(activeTaskManagers); + } + } + }, + executor); + } + } catch (Exception e) { + LOG.warn("Exception while fetching metrics.", e); + } + } + + /** + * Retrieves and queries the specified QueryServiceGateway. + * + * @param queryServicePath specifying the QueryServiceGateway + */ + private void retrieveAndQueryMetrics(String queryServicePath) { + final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath); + + queryServiceGatewayFuture.whenCompleteAsync( + (MetricQueryServiceGateway queryServiceGateway, Throwable t) -> { + if (t != null) { + LOG.debug("Could not retrieve QueryServiceGateway.", t); + } else { + queryMetrics(queryServiceGateway); + } + }, + executor); + } + + /** + * Query the metrics from the given QueryServiceGateway. + * + * @param queryServiceGateway to query for metrics + */ + private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) { + queryServiceGateway + .queryMetrics(timeout) + .whenCompleteAsync( + (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> { + if (t != null) { + LOG.debug("Fetching metrics failed.", t); + } else { + List<MetricDump> dumpedMetrics = deserializer.deserialize(result); + synchronized (metrics) { + for (MetricDump metric : dumpedMetrics) { + metrics.add(metric); + } + } + } + }, + executor); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java new file mode 100644 index 0000000..6d3fc99 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import org.apache.flink.runtime.metrics.dump.MetricDump; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER; +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE; +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM; +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM; + +/** + * Nested data-structure to store metrics. + * + * <p>This structure is not thread-safe. + */ +public class MetricStore { + private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class); + + final JobManagerMetricStore jobManager = new JobManagerMetricStore(); + final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>(); + final Map<String, JobMetricStore> jobs = new HashMap<>(); + + // ----------------------------------------------------------------------------------------------------------------- + // Adding metrics + // ----------------------------------------------------------------------------------------------------------------- + public void add(MetricDump metric) { + try { + QueryScopeInfo info = metric.scopeInfo; + TaskManagerMetricStore tm; + JobMetricStore job; + TaskMetricStore task; + SubtaskMetricStore subtask; + + String name = info.scope.isEmpty() + ? metric.name + : info.scope + "." + metric.name; + + if (name.isEmpty()) { // malformed transmission + return; + } + + switch (info.getCategory()) { + case INFO_CATEGORY_JM: + addMetric(jobManager.metrics, name, metric); + break; + case INFO_CATEGORY_TM: + String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; + tm = taskManagers.get(tmID); + if (tm == null) { + tm = new TaskManagerMetricStore(); + taskManagers.put(tmID, tm); + } + if (name.contains("GarbageCollector")) { + String gcName = name.substring("Status.JVM.GarbageCollector.".length(), name.lastIndexOf('.')); + tm.addGarbageCollectorName(gcName); + } + addMetric(tm.metrics, name, metric); + break; + case INFO_CATEGORY_JOB: + QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info; + job = jobs.get(jobInfo.jobID); + if (job == null) { + job = new JobMetricStore(); + jobs.put(jobInfo.jobID, job); + } + addMetric(job.metrics, name, metric); + break; + case INFO_CATEGORY_TASK: + QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info; + job = jobs.get(taskInfo.jobID); + if (job == null) { + job = new JobMetricStore(); + jobs.put(taskInfo.jobID, job); + } + task = job.tasks.get(taskInfo.vertexID); + if (task == null) { + task = new TaskMetricStore(); + job.tasks.put(taskInfo.vertexID, task); + } + subtask = task.subtasks.get(taskInfo.subtaskIndex); + if (subtask == null) { + subtask = new SubtaskMetricStore(); + task.subtasks.put(taskInfo.subtaskIndex, subtask); + } + /** + * The duplication is intended. Metrics scoped by subtask are useful for several job/task handlers, + * while the WebInterface task metric queries currently do not account for subtasks, so we don't + * divide by subtask and instead use the concatenation of subtask index and metric name as the name + * for those. + */ + addMetric(subtask.metrics, name, metric); + addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric); + break; + case INFO_CATEGORY_OPERATOR: + QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info; + job = jobs.get(operatorInfo.jobID); + if (job == null) { + job = new JobMetricStore(); + jobs.put(operatorInfo.jobID, job); + } + task = job.tasks.get(operatorInfo.vertexID); + if (task == null) { + task = new TaskMetricStore(); + job.tasks.put(operatorInfo.vertexID, task); + } + /** + * As the WebInterface does not account for operators (because it can't) we don't + * divide by operator and instead use the concatenation of subtask index, operator name and metric name + * as the name. + */ + addMetric(task.metrics, operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, metric); + break; + default: + LOG.debug("Invalid metric dump category: " + info.getCategory()); + } + } catch (Exception e) { + LOG.debug("Malformed metric dump.", e); + } + } + + private void addMetric(Map<String, String> target, String name, MetricDump metric) { + switch (metric.getCategory()) { + case METRIC_CATEGORY_COUNTER: + MetricDump.CounterDump counter = (MetricDump.CounterDump) metric; + target.put(name, String.valueOf(counter.count)); + break; + case METRIC_CATEGORY_GAUGE: + MetricDump.GaugeDump gauge = (MetricDump.GaugeDump) metric; + target.put(name, gauge.value); + break; + case METRIC_CATEGORY_HISTOGRAM: + MetricDump.HistogramDump histogram = (MetricDump.HistogramDump) metric; + target.put(name + "_min", String.valueOf(histogram.min)); + target.put(name + "_max", String.valueOf(histogram.max)); + target.put(name + "_mean", String.valueOf(histogram.mean)); + target.put(name + "_median", String.valueOf(histogram.median)); + target.put(name + "_stddev", String.valueOf(histogram.stddev)); + target.put(name + "_p75", String.valueOf(histogram.p75)); + target.put(name + "_p90", String.valueOf(histogram.p90)); + target.put(name + "_p95", String.valueOf(histogram.p95)); + target.put(name + "_p98", String.valueOf(histogram.p98)); + target.put(name + "_p99", String.valueOf(histogram.p99)); + target.put(name + "_p999", String.valueOf(histogram.p999)); + break; + case METRIC_CATEGORY_METER: + MetricDump.MeterDump meter = (MetricDump.MeterDump) metric; + target.put(name, String.valueOf(meter.rate)); + break; + } + } + + // ----------------------------------------------------------------------------------------------------------------- + // Accessors for sub MetricStores + // ----------------------------------------------------------------------------------------------------------------- + + /** + * Returns the {@link JobManagerMetricStore}. + * + * @return JobManagerMetricStore + */ + public JobManagerMetricStore getJobManagerMetricStore() { + return jobManager; + } + + /** + * Returns the {@link TaskManagerMetricStore} for the given taskmanager ID. + * + * @param tmID taskmanager ID + * @return TaskManagerMetricStore for the given ID, or null if no store for the given argument exists + */ + public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) { + return taskManagers.get(tmID); + } + + /** + * Returns the {@link JobMetricStore} for the given job ID. + * + * @param jobID job ID + * @return JobMetricStore for the given ID, or null if no store for the given argument exists + */ + public JobMetricStore getJobMetricStore(String jobID) { + return jobs.get(jobID); + } + + /** + * Returns the {@link TaskMetricStore} for the given job/task ID. + * + * @param jobID job ID + * @param taskID task ID + * @return TaskMetricStore for given IDs, or null if no store for the given arguments exists + */ + public TaskMetricStore getTaskMetricStore(String jobID, String taskID) { + JobMetricStore job = getJobMetricStore(jobID); + if (job == null) { + return null; + } + return job.getTaskMetricStore(taskID); + } + + /** + * Returns the {@link SubtaskMetricStore} for the given job/task ID and subtask index. + * + * @param jobID job ID + * @param taskID task ID + * @param subtaskIndex subtask index + * @return SubtaskMetricStore for the given IDs and index, or null if no store for the given arguments exists + */ + public SubtaskMetricStore getSubtaskMetricStore(String jobID, String taskID, int subtaskIndex) { + TaskMetricStore task = getTaskMetricStore(jobID, taskID); + if (task == null) { + return null; + } + return task.getSubtaskMetricStore(subtaskIndex); + } + + // ----------------------------------------------------------------------------------------------------------------- + // sub MetricStore classes + // ----------------------------------------------------------------------------------------------------------------- + private abstract static class ComponentMetricStore { + public final Map<String, String> metrics = new HashMap<>(); + + public String getMetric(String name, String defaultValue) { + String value = this.metrics.get(name); + return value != null + ? value + : defaultValue; + } + } + + /** + * Sub-structure containing metrics of the JobManager. + */ + public static class JobManagerMetricStore extends ComponentMetricStore { + } + + /** + * Sub-structure containing metrics of a single TaskManager. + */ + public static class TaskManagerMetricStore extends ComponentMetricStore { + public final Set<String> garbageCollectorNames = new HashSet<>(); + + public void addGarbageCollectorName(String name) { + garbageCollectorNames.add(name); + } + } + + /** + * Sub-structure containing metrics of a single Job. + */ + public static class JobMetricStore extends ComponentMetricStore { + private final Map<String, TaskMetricStore> tasks = new HashMap<>(); + + public TaskMetricStore getTaskMetricStore(String taskID) { + return tasks.get(taskID); + } + } + + /** + * Sub-structure containing metrics of a single Task. + */ + public static class TaskMetricStore extends ComponentMetricStore { + private final Map<Integer, SubtaskMetricStore> subtasks = new HashMap<>(); + + public SubtaskMetricStore getSubtaskMetricStore(int subtaskIndex) { + return subtasks.get(subtaskIndex); + } + } + + /** + * Sub-structure containing metrics of a single Subtask. + */ + public static class SubtaskMetricStore extends ComponentMetricStore { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java new file mode 100644 index 0000000..90bafb7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.metrics; + +import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler; + +import java.util.Map; +import java.util.concurrent.Executor; + +/** + * Request handler that returns for a given task manager a list of all available metrics or the values for a set of metrics. + * + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } + * + * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. + * {@code /get?X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + */ +public class TaskManagerMetricsHandler extends AbstractMetricsHandler { + + private static final String TASKMANAGER_METRICS_REST_PATH = "/taskmanagers/:taskmanagerid/metrics"; + + public TaskManagerMetricsHandler(Executor executor, MetricFetcher fetcher) { + super(executor, fetcher); + } + + @Override + public String[] getPaths() { + return new String[]{TASKMANAGER_METRICS_REST_PATH}; + } + + @Override + protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { + MetricStore.TaskManagerMetricStore taskManager = metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY)); + if (taskManager == null) { + return null; + } else { + return taskManager.metrics; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java new file mode 100644 index 0000000..e2aaaf7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.util; + +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.rest.handler.legacy.JobVertexDetailsHandler; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; + +import com.fasterxml.jackson.core.JsonGenerator; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * This class is a mutable version of the {@link IOMetrics} class that allows adding up IO-related metrics. + * + * <p>For finished jobs these metrics are stored in the {@link ExecutionGraph} as another {@link IOMetrics}. + * For running jobs these metrics are retrieved using the {@link MetricFetcher}. + * + * <p>This class provides a common interface to handle both cases, reducing complexity in various handlers (like + * the {@link JobVertexDetailsHandler}). + */ +public class MutableIOMetrics extends IOMetrics { + + private static final long serialVersionUID = -5460777634971381737L; + + public MutableIOMetrics() { + super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D); + } + + /** + * Adds the IO metrics for the given attempt to this object. If the {@link AccessExecution} is in + * a terminal state the contained {@link IOMetrics} object is added. Otherwise the given {@link MetricFetcher} is + * used to retrieve the required metrics. + * + * @param attempt Attempt whose IO metrics should be added + * @param fetcher MetricFetcher to retrieve metrics for running jobs + * @param jobID JobID to which the attempt belongs + * @param taskID TaskID to which the attempt belongs + */ + public void addIOMetrics(AccessExecution attempt, @Nullable MetricFetcher fetcher, String jobID, String taskID) { + if (attempt.getState().isTerminal()) { + IOMetrics ioMetrics = attempt.getIOMetrics(); + if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph + this.numBytesInLocal += ioMetrics.getNumBytesInLocal(); + this.numBytesInRemote += ioMetrics.getNumBytesInRemote(); + this.numBytesOut += ioMetrics.getNumBytesOut(); + this.numRecordsIn += ioMetrics.getNumRecordsIn(); + this.numRecordsOut += ioMetrics.getNumRecordsOut(); + } + } else { // execAttempt is still running, use MetricQueryService instead + if (fetcher != null) { + fetcher.update(); + MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex()); + if (metrics != null) { + this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")); + this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")); + this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")); + this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")); + this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")); + } + } + } + } + + /** + * Writes the IO metrics contained in this object to the given {@link JsonGenerator}. + * + * <p>The JSON structure written is as follows: + * "metrics": { + * "read-bytes": 1, + * "write-bytes": 2, + * "read-records": 3, + * "write-records": 4 + * } + * + * @param gen JsonGenerator to which the metrics should be written + * @throws IOException + */ + public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException { + gen.writeObjectFieldStart("metrics"); + gen.writeNumberField("read-bytes", this.numBytesInLocal + this.numBytesInRemote); + gen.writeNumberField("write-bytes", this.numBytesOut); + gen.writeNumberField("read-records", this.numRecordsIn); + gen.writeNumberField("write-records", this.numRecordsOut); + gen.writeEndObject(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java new file mode 100644 index 0000000..5bfa1f9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandlerTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.Executors; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for the ClusterOverviewHandler. + */ +public class ClusterOverviewHandlerTest { + @Test + public void testGetPaths() { + ClusterOverviewHandler handler = new ClusterOverviewHandler(Executors.directExecutor(), Time.seconds(0L)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/overview", paths[0]); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java new file mode 100644 index 0000000..0ada30d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandlerTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.Executors; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for the CurrentJobIdsHandler. + */ +public class CurrentJobIdsHandlerTest { + @Test + public void testGetPaths() { + CurrentJobIdsHandler handler = new CurrentJobIdsHandler(Executors.directExecutor(), Time.seconds(0L)); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs", paths[0]); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java new file mode 100644 index 0000000..83bb157 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Collection; + +/** + * Tests for the CurrentJobsOverviewHandler. + */ +public class CurrentJobsOverviewHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/joboverview", archive.getPath()); + + JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(archive.getJson()); + ArrayNode running = (ArrayNode) result.get("running"); + Assert.assertEquals(0, running.size()); + + ArrayNode finished = (ArrayNode) result.get("finished"); + Assert.assertEquals(1, finished.size()); + + compareJobOverview(expectedDetails, finished.get(0).toString()); + } + + @Test + public void testGetPaths() { + CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, true); + String[] pathsAll = handlerAll.getPaths(); + Assert.assertEquals(1, pathsAll.length); + Assert.assertEquals("/joboverview", pathsAll[0]); + + CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, false); + String[] pathsRunning = handlerRunning.getPaths(); + Assert.assertEquals(1, pathsRunning.length); + Assert.assertEquals("/joboverview/running", pathsRunning[0]); + + CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), false, true); + String[] pathsCompleted = handlerCompleted.getPaths(); + Assert.assertEquals(1, pathsCompleted.length); + Assert.assertEquals("/joboverview/completed", pathsCompleted[0]); + } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob); + StringWriter writer = new StringWriter(); + try (JsonGenerator gen = ArchivedJobGenerationUtils.JACKSON_FACTORY.createGenerator(writer)) { + CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 0); + } + compareJobOverview(expectedDetails, writer.toString()); + } + + private static void compareJobOverview(JobDetails expectedDetails, String answer) throws IOException { + JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(answer); + + Assert.assertEquals(expectedDetails.getJobId().toString(), result.get("jid").asText()); + Assert.assertEquals(expectedDetails.getJobName(), result.get("name").asText()); + Assert.assertEquals(expectedDetails.getStatus().name(), result.get("state").asText()); + + Assert.assertEquals(expectedDetails.getStartTime(), result.get("start-time").asLong()); + Assert.assertEquals(expectedDetails.getEndTime(), result.get("end-time").asLong()); + Assert.assertEquals(expectedDetails.getEndTime() - expectedDetails.getStartTime(), result.get("duration").asLong()); + Assert.assertEquals(expectedDetails.getLastUpdateTime(), result.get("last-modification").asLong()); + + JsonNode tasks = result.get("tasks"); + Assert.assertEquals(expectedDetails.getNumTasks(), tasks.get("total").asInt()); + int[] tasksPerState = expectedDetails.getNumVerticesPerExecutionState(); + Assert.assertEquals( + tasksPerState[ExecutionState.CREATED.ordinal()] + tasksPerState[ExecutionState.SCHEDULED.ordinal()] + tasksPerState[ExecutionState.DEPLOYING.ordinal()], + tasks.get("pending").asInt()); + Assert.assertEquals(tasksPerState[ExecutionState.RUNNING.ordinal()], tasks.get("running").asInt()); + Assert.assertEquals(tasksPerState[ExecutionState.FINISHED.ordinal()], tasks.get("finished").asInt()); + Assert.assertEquals(tasksPerState[ExecutionState.CANCELING.ordinal()], tasks.get("canceling").asInt()); + Assert.assertEquals(tasksPerState[ExecutionState.CANCELED.ordinal()], tasks.get("canceled").asInt()); + Assert.assertEquals(tasksPerState[ExecutionState.FAILED.ordinal()], tasks.get("failed").asInt()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java new file mode 100644 index 0000000..06a99fe --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.util.EnvironmentInformation; + +import com.fasterxml.jackson.databind.JsonNode; +import org.junit.Assert; +import org.junit.Test; + +import java.util.TimeZone; + +/** + * Tests for the DashboardConfigHandler. + */ +public class DashboardConfigHandlerTest { + @Test + public void testGetPaths() { + DashboardConfigHandler handler = new DashboardConfigHandler(Executors.directExecutor(), 10000L); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/config", paths[0]); + } + + @Test + public void testJsonGeneration() throws Exception { + long refreshInterval = 12345; + TimeZone timeZone = TimeZone.getDefault(); + EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation(); + + String json = DashboardConfigHandler.createConfigJson(refreshInterval); + + JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); + + Assert.assertEquals(refreshInterval, result.get("refresh-interval").asLong()); + Assert.assertEquals(timeZone.getDisplayName(), result.get("timezone-name").asText()); + Assert.assertEquals(timeZone.getRawOffset(), result.get("timezone-offset").asLong()); + Assert.assertEquals(EnvironmentInformation.getVersion(), result.get("flink-version").asText()); + Assert.assertEquals(revision.commitId + " @ " + revision.commitDate, result.get("flink-revision").asText()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java new file mode 100644 index 0000000..7e96835 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/HandlerRedirectUtilsTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.akka.AkkaJobManagerGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the HandlerRedirectUtils. + */ +public class HandlerRedirectUtilsTest extends TestLogger { + + private static final String localRestAddress = "http://127.0.0.1:1234"; + private static final String remoteRestAddress = "http://127.0.0.2:1234"; + + @Test + public void testGetRedirectAddressWithLocalEqualsRemoteRESTAddress() throws Exception { + JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(localRestAddress)); + + CompletableFuture<Optional<String>> redirectingAddressFuture = HandlerRedirectUtils.getRedirectAddress( + localRestAddress, + jobManagerGateway, + Time.seconds(3L)); + + Assert.assertTrue(redirectingAddressFuture.isDone()); + // no redirection needed + Assert.assertFalse(redirectingAddressFuture.get().isPresent()); + } + + @Test + public void testGetRedirectAddressWithRemoteAkkaPath() throws Exception { + JobManagerGateway jobManagerGateway = mock(AkkaJobManagerGateway.class); + when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(remoteRestAddress)); + + CompletableFuture<Optional<String>> optRedirectingAddress = HandlerRedirectUtils.getRedirectAddress( + localRestAddress, + jobManagerGateway, + Time.seconds(3L)); + + Assert.assertTrue(optRedirectingAddress.isDone()); + + Assert.assertEquals(remoteRestAddress, optRedirectingAddress.get().get()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java new file mode 100644 index 0000000..e1736c1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the JobAccumulatorsHandler. + */ +public class JobAccumulatorsHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/accumulators", archive.getPath()); + compareAccumulators(originalJob, archive.getJson()); + } + + @Test + public void testGetPaths() { + JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + String[] paths = handler.getPaths(); + Assert.assertEquals(1, paths.length); + Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]); + } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + String json = JobAccumulatorsHandler.createJobAccumulatorsJson(originalJob); + + compareAccumulators(originalJob, json); + } + + private static void compareAccumulators(AccessExecutionGraph originalJob, String json) throws IOException { + JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json); + + ArrayNode accs = (ArrayNode) result.get("job-accumulators"); + Assert.assertEquals(0, accs.size()); + + Assert.assertTrue(originalJob.getAccumulatorResultsStringified().length > 0); + ArchivedJobGenerationUtils.compareStringifiedAccumulators( + originalJob.getAccumulatorResultsStringified(), + (ArrayNode) result.get("user-task-accumulators")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java new file mode 100644 index 0000000..cab8835 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandlerTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.testingUtils.TestingUtils; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** + * Tests for the JobCancellationHandler. + */ +public class JobCancellationHandlerTest { + @Test + public void testGetPaths() { + JobCancellationHandler handler = new JobCancellationHandler(Executors.directExecutor(), TestingUtils.TIMEOUT()); + String[] paths = handler.getPaths(); + Assert.assertEquals(2, paths.length); + List<String> pathsList = Lists.newArrayList(paths); + Assert.assertTrue(pathsList.contains("/jobs/:jobid/cancel")); + Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-cancel")); + } +}