[
https://issues.apache.org/jira/browse/FLINK-5852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887999#comment-15887999
]
ASF GitHub Bot commented on FLINK-5852:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/3365#discussion_r103431658
--- Diff:
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java
---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+public class JsonUtils {
+ public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
+
+ @Public
+ public static final class Keys {
+ public static final String TASKMANAGERS = "taskmanagers";
+ public static final String JOB_ID = "jid";
+ public static final String ID = "id";
+ public static final String NAME = "name";
+ public static final String STATE = "state";
+ public static final String IS_STOPPABLE = "isStoppable";
+ public static final String PARALLELISM = "parallelism";
+ public static final String PLAN = "plan";
+
+ public static final String START_TIME = "start-time";
+ public static final String END_TIME = "end-time";
+ public static final String DURATION = "duration";
+ public static final String NOW = "now";
+ public static final String LAST_MODIFICATION =
"last-modification";
+
+ public static final String TIMESTAMP = "timestamp";
+ public static final String TIMESTAMPS = "timestamps";
+ public static final String STATUS_COUNTS = "status-counts";
+
+ public static final String REFRESH_INTERVAL =
"refresh-interval";
+ public static final String TIMEZONE_OFFSET = "timezone-offset";
+ public static final String TIMEZONE_NAME = "timezone-name";
+ public static final String FLINK_VERSION = "flink-version";
+ public static final String FLINK_REVISION = "flink-revision";
+
+ public static final String EXECUTION_CONFIG =
"execution-config";
+ public static final String MODE = "mode";
+ public static final String EXECUTION_MODE = "execution-mode";
+ public static final String RESTART_STRATEGY =
"restart-strategy";
+ public static final String JOB_PARALLELISM = "job-parallelism";
+ public static final String OBJECT_REUSE_MODE =
"object-reuse-mode";
+ public static final String USER_CONFIG = "user-config";
+
+ public static final String ROOT_EXCEPTION = "root-exception";
+ public static final String ALL_EXCEPTIONS = "all-exceptions";
+ public static final String EXCEPTION = "exception";
+ public static final String TRUNCATED = "truncated";
+
+ public static final String HOST = "host";
+ public static final String LOCATION = "location";
+
+ public static final String VERTICES = "vertices";
+ public static final String TASKS = "tasks";
+ public static final String TASK = "task";
+ public static final String SUBTASKS = "subtasks";
+ public static final String SUBTASK = "subtask";
+ public static final String ATTEMPT = "attempt";
+
+ public static final String STATUS = "status";
+ public static final String TOTAL = "total";
+ public static final String PENDING = "pending";
+ public static final String RUNNING = "running";
+ public static final String FINISHED = "finished";
+ public static final String CANCELING = "canceling";
+ public static final String CANCELED = "canceled";
+ public static final String FAILED = "failed";
+ public static final String RESTORED = "restored";
+ public static final String PENDING_OR_FAILED =
"pending_or_failed";
+ public static final String DISCARDED = "discarded";
+ public static final String IN_PROGRESS = "in_progress";
+ public static final String COMPLETED = "completed";
+
+ public static final String METRICS = "metrics";
+ public static final String WRITE_BYTES = "write-bytes";
+ public static final String READ_BYTES = "read-bytes";
+ public static final String WRITE_RECORDS = "write-records";
+ public static final String READ_RECORDS = "read-records";
+ public static final String TYPE = "type";
+ public static final String VALUE = "value";
+
+ public static final String MIN = "min";
+ public static final String MAX = "max";
+ public static final String AVG = "avg";
+
+ public static final String JOB_ACCUMULATORS =
"job-accumulators";
+ public static final String USER_ACCUMULATORS =
"user-accumulators";
+ public static final String USER_TASK_ACCUMULATORS =
"user-task-accumulators";
+
+ public static final String COUNTS = "counts";
+ public static final String EXTERNALIZATION = "externalization";
+ public static final String EXTERNAL_PATH = "external-path";
+ public static final String DELETE_ON_CANCEL =
"delete_on_cancellation";
+ public static final String HISTORY = "history";
+
+ public static final String SUMMARY = "summary";
+ public static final String STATE_SIZE = "state_size";
+ public static final String ETE_DURATION = "end_to_end_duration";
+ public static final String ALIGNMENT_BUFFERED =
"alignment_buffered";
+ public static final String SAVEPOINT = "savepoint";
+ public static final String IS_SAVEPOINT = "is_savepoint";
+ public static final String CHECKPOINT = "checkpoint";
+ public static final String CHECKPOINT_DURATION =
"checkpoint_duration";
+ public static final String SYNC = "sync";
+ public static final String ASYNC = "async";
+ public static final String ALIGNMENT = "alignment";
+ public static final String BUFFERED = "buffered";
+
+ public static final String LATEST = "latest";
+
+ public static final String FAILURE_TIMESTAMP =
"failure_timestamp";
+ public static final String FAILURE_MESSAGE = "failure_message";
+ public static final String RESTORE_TIMESTAMP =
"restore_timestamp";
+
+ public static final String TRIGGER_TIMESTAMP =
"trigger_timestamp";
+ public static final String ACK_TIMESTAMP = "ack_timestamp";
+ public static final String LATEST_ACK_TIMESTAMP =
"latest_ack_timestamp";
+
+ public static final String NUM_SUBTASKS = "num_subtasks";
+ public static final String NUM_ACK_SUBTASKS =
"num_acknowledged_subtasks";
+ public static final String INDEX = "index";
+ public static final String INTERVAL = "interval";
+ public static final String ENABLED = "enabled";
+ public static final String TIMEOUT = "timeout";
+ public static final String MIN_PAUSE = "min_pause";
+ public static final String MAX_CONCURRENT = "max_concurrent";
+
+ private Keys() {
+ }
+ }
+
+ public static void writeJobDetailOverviewAsJson(AccessExecutionGraph
graph, JsonGenerator gen) throws IOException {
+
CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph),
gen, System.currentTimeMillis());
+ }
+
+ public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats
minMaxAvg) throws IOException {
+ gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum());
+ gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum());
+ gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage());
+ }
+
+ public static void addIOMetrics(MutableIOMetrics summedMetrics,
ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher
fetcher, String jobID, String taskID, int subtaskIndex) {
+ if (state.isTerminal()) {
+ if (ioMetrics != null) { // execAttempt is already
finished, use final metrics stored in ExecutionGraph
+
summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal());
+
summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote());
+
summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut());
+
summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn());
+
summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut());
+ }
+ } else { // execAttempt is still running, use
MetricQueryService instead
+ if (fetcher != null) {
+ fetcher.update();
+ MetricStore.SubtaskMetricStore metrics =
fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, subtaskIndex);
+ if (metrics != null) {
+
summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL,
"0")));
+
summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE,
"0")));
+
summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT,
"0")));
+
summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN,
"0")));
+
summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT,
"0")));
+ }
+ }
+ }
+ }
+
+ public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics)
throws IOException {
--- End diff --
Move this to `MutableIOMetrics`
> Move JSON generation code into static methods
> ---------------------------------------------
>
> Key: FLINK-5852
> URL: https://issues.apache.org/jira/browse/FLINK-5852
> Project: Flink
> Issue Type: Improvement
> Components: Webfrontend
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Fix For: 1.3.0
>
>
> In order to implement the HistoryServer we need a way to generate the JSON
> responses independent of the REST API. As such i suggest to move the main
> parts of the generation code for job-specific handlers into static methods.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)