Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/3365#discussion_r103431658
--- Diff:
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java
---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+public class JsonUtils {
+ public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
+
+ @Public
+ public static final class Keys {
+ public static final String TASKMANAGERS = "taskmanagers";
+ public static final String JOB_ID = "jid";
+ public static final String ID = "id";
+ public static final String NAME = "name";
+ public static final String STATE = "state";
+ public static final String IS_STOPPABLE = "isStoppable";
+ public static final String PARALLELISM = "parallelism";
+ public static final String PLAN = "plan";
+
+ public static final String START_TIME = "start-time";
+ public static final String END_TIME = "end-time";
+ public static final String DURATION = "duration";
+ public static final String NOW = "now";
+ public static final String LAST_MODIFICATION =
"last-modification";
+
+ public static final String TIMESTAMP = "timestamp";
+ public static final String TIMESTAMPS = "timestamps";
+ public static final String STATUS_COUNTS = "status-counts";
+
+ public static final String REFRESH_INTERVAL =
"refresh-interval";
+ public static final String TIMEZONE_OFFSET = "timezone-offset";
+ public static final String TIMEZONE_NAME = "timezone-name";
+ public static final String FLINK_VERSION = "flink-version";
+ public static final String FLINK_REVISION = "flink-revision";
+
+ public static final String EXECUTION_CONFIG =
"execution-config";
+ public static final String MODE = "mode";
+ public static final String EXECUTION_MODE = "execution-mode";
+ public static final String RESTART_STRATEGY =
"restart-strategy";
+ public static final String JOB_PARALLELISM = "job-parallelism";
+ public static final String OBJECT_REUSE_MODE =
"object-reuse-mode";
+ public static final String USER_CONFIG = "user-config";
+
+ public static final String ROOT_EXCEPTION = "root-exception";
+ public static final String ALL_EXCEPTIONS = "all-exceptions";
+ public static final String EXCEPTION = "exception";
+ public static final String TRUNCATED = "truncated";
+
+ public static final String HOST = "host";
+ public static final String LOCATION = "location";
+
+ public static final String VERTICES = "vertices";
+ public static final String TASKS = "tasks";
+ public static final String TASK = "task";
+ public static final String SUBTASKS = "subtasks";
+ public static final String SUBTASK = "subtask";
+ public static final String ATTEMPT = "attempt";
+
+ public static final String STATUS = "status";
+ public static final String TOTAL = "total";
+ public static final String PENDING = "pending";
+ public static final String RUNNING = "running";
+ public static final String FINISHED = "finished";
+ public static final String CANCELING = "canceling";
+ public static final String CANCELED = "canceled";
+ public static final String FAILED = "failed";
+ public static final String RESTORED = "restored";
+ public static final String PENDING_OR_FAILED =
"pending_or_failed";
+ public static final String DISCARDED = "discarded";
+ public static final String IN_PROGRESS = "in_progress";
+ public static final String COMPLETED = "completed";
+
+ public static final String METRICS = "metrics";
+ public static final String WRITE_BYTES = "write-bytes";
+ public static final String READ_BYTES = "read-bytes";
+ public static final String WRITE_RECORDS = "write-records";
+ public static final String READ_RECORDS = "read-records";
+ public static final String TYPE = "type";
+ public static final String VALUE = "value";
+
+ public static final String MIN = "min";
+ public static final String MAX = "max";
+ public static final String AVG = "avg";
+
+ public static final String JOB_ACCUMULATORS =
"job-accumulators";
+ public static final String USER_ACCUMULATORS =
"user-accumulators";
+ public static final String USER_TASK_ACCUMULATORS =
"user-task-accumulators";
+
+ public static final String COUNTS = "counts";
+ public static final String EXTERNALIZATION = "externalization";
+ public static final String EXTERNAL_PATH = "external-path";
+ public static final String DELETE_ON_CANCEL =
"delete_on_cancellation";
+ public static final String HISTORY = "history";
+
+ public static final String SUMMARY = "summary";
+ public static final String STATE_SIZE = "state_size";
+ public static final String ETE_DURATION = "end_to_end_duration";
+ public static final String ALIGNMENT_BUFFERED =
"alignment_buffered";
+ public static final String SAVEPOINT = "savepoint";
+ public static final String IS_SAVEPOINT = "is_savepoint";
+ public static final String CHECKPOINT = "checkpoint";
+ public static final String CHECKPOINT_DURATION =
"checkpoint_duration";
+ public static final String SYNC = "sync";
+ public static final String ASYNC = "async";
+ public static final String ALIGNMENT = "alignment";
+ public static final String BUFFERED = "buffered";
+
+ public static final String LATEST = "latest";
+
+ public static final String FAILURE_TIMESTAMP =
"failure_timestamp";
+ public static final String FAILURE_MESSAGE = "failure_message";
+ public static final String RESTORE_TIMESTAMP =
"restore_timestamp";
+
+ public static final String TRIGGER_TIMESTAMP =
"trigger_timestamp";
+ public static final String ACK_TIMESTAMP = "ack_timestamp";
+ public static final String LATEST_ACK_TIMESTAMP =
"latest_ack_timestamp";
+
+ public static final String NUM_SUBTASKS = "num_subtasks";
+ public static final String NUM_ACK_SUBTASKS =
"num_acknowledged_subtasks";
+ public static final String INDEX = "index";
+ public static final String INTERVAL = "interval";
+ public static final String ENABLED = "enabled";
+ public static final String TIMEOUT = "timeout";
+ public static final String MIN_PAUSE = "min_pause";
+ public static final String MAX_CONCURRENT = "max_concurrent";
+
+ private Keys() {
+ }
+ }
+
+ public static void writeJobDetailOverviewAsJson(AccessExecutionGraph
graph, JsonGenerator gen) throws IOException {
+
CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph),
gen, System.currentTimeMillis());
+ }
+
+ public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats
minMaxAvg) throws IOException {
+ gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum());
+ gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum());
+ gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage());
+ }
+
+ public static void addIOMetrics(MutableIOMetrics summedMetrics,
ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher
fetcher, String jobID, String taskID, int subtaskIndex) {
+ if (state.isTerminal()) {
+ if (ioMetrics != null) { // execAttempt is already
finished, use final metrics stored in ExecutionGraph
+
summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal());
+
summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote());
+
summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut());
+
summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn());
+
summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut());
+ }
+ } else { // execAttempt is still running, use
MetricQueryService instead
+ if (fetcher != null) {
+ fetcher.update();
+ MetricStore.SubtaskMetricStore metrics =
fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, subtaskIndex);
+ if (metrics != null) {
+
summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL,
"0")));
+
summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE,
"0")));
+
summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT,
"0")));
+
summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN,
"0")));
+
summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT,
"0")));
+ }
+ }
+ }
+ }
+
+ public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics)
throws IOException {
--- End diff --
Move this to `MutableIOMetrics`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---