[FLINK-7608][metrics] Rework latency metric

This closes #5161.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e40cb348
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e40cb348
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e40cb348

Branch: refs/heads/master
Commit: e40cb34868e2c6ff7548653e1e5e2dfbe7d47967
Parents: dbb81ac
Author: yew1eb <[email protected]>
Authored: Sat Oct 14 17:19:49 2017 +0800
Committer: zentol <[email protected]>
Committed: Tue Feb 6 20:20:48 2018 +0100

----------------------------------------------------------------------
 .../metrics/DescriptiveStatisticsHistogram.java |  50 ++++++
 ...escriptiveStatisticsHistogramStatistics.java |  71 +++++++++
 .../api/operators/AbstractStreamOperator.java   | 151 +++----------------
 .../streaming/api/operators/StreamSink.java     |   4 +-
 .../streaming/api/operators/StreamSource.java   |   7 +-
 .../runtime/streamrecord/LatencyMarker.java     |  17 ++-
 .../streamrecord/StreamElementSerializer.java   |   8 +-
 .../flink/streaming/util/LatencyStats.java      |  62 ++++++++
 .../operators/StreamSourceOperatorTest.java     |   4 +-
 9 files changed, 223 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e40cb348/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
new file mode 100644
index 0000000..f01b984
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+
+/**
+ * The {@link DescriptiveStatisticsHistogram} use a DescriptiveStatistics 
{@link DescriptiveStatistics} as a Flink {@link Histogram}.
+ */
+public class DescriptiveStatisticsHistogram implements 
org.apache.flink.metrics.Histogram {
+
+       private final DescriptiveStatistics descriptiveStatistics;
+
+       public DescriptiveStatisticsHistogram(int windowSize) {
+               this.descriptiveStatistics = new 
DescriptiveStatistics(windowSize);
+       }
+
+       @Override
+       public void update(long value) {
+               this.descriptiveStatistics.addValue(value);
+       }
+
+       @Override
+       public long getCount() {
+               return this.descriptiveStatistics.getN();
+       }
+
+       @Override
+       public HistogramStatistics getStatistics() {
+               return new 
DescriptiveStatisticsHistogramStatistics(this.descriptiveStatistics);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e40cb348/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java
new file mode 100644
index 0000000..01d4c30
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.metrics.HistogramStatistics;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+
+import java.util.Arrays;
+
+/**
+ * DescriptiveStatistics histogram statistics implementation returned by 
{@link DescriptiveStatisticsHistogram}.
+ * The statistics class wraps a {@link DescriptiveStatistics} instance and 
forwards the method calls accordingly.
+ */
+public class DescriptiveStatisticsHistogramStatistics extends 
HistogramStatistics {
+       private final DescriptiveStatistics descriptiveStatistics;
+
+       public DescriptiveStatisticsHistogramStatistics(DescriptiveStatistics 
latencyHistogram) {
+               this.descriptiveStatistics = latencyHistogram;
+       }
+
+       @Override
+       public double getQuantile(double quantile) {
+               return descriptiveStatistics.getPercentile(quantile * 100);
+       }
+
+       @Override
+       public long[] getValues() {
+               return 
Arrays.stream(descriptiveStatistics.getValues()).mapToLong(i -> (long) 
i).toArray();
+       }
+
+       @Override
+       public int size() {
+               return (int) descriptiveStatistics.getN();
+       }
+
+       @Override
+       public double getMean() {
+               return descriptiveStatistics.getMean();
+       }
+
+       @Override
+       public double getStdDev() {
+               return descriptiveStatistics.getStandardDeviation();
+       }
+
+       @Override
+       public long getMax() {
+               return (long) descriptiveStatistics.getMax();
+       }
+
+       @Override
+       public long getMin() {
+               return (long) descriptiveStatistics.getMin();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e40cb348/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7db157c..42b6923 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -31,12 +31,12 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -60,21 +60,18 @@ import 
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
 import org.apache.flink.util.CloseableIterable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.Serializable;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * Base class for all stream operators. Operators that contain a user function 
should extend the class
@@ -150,7 +147,7 @@ public abstract class AbstractStreamOperator<OUT>
        /** Metric group for the operator. */
        protected transient OperatorMetricGroup metrics;
 
-       protected transient LatencyGauge latencyGauge;
+       protected transient LatencyStats latencyStats;
 
        // ---------------- time handler ------------------
 
@@ -188,14 +185,21 @@ public abstract class AbstractStreamOperator<OUT>
                        this.metrics = 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
                        this.output = output;
                }
-               Configuration taskManagerConfig = 
environment.getTaskManagerInfo().getConfiguration();
-               int historySize = 
taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
-               if (historySize <= 0) {
-                       LOG.warn("{} has been set to a value equal or below 0: 
{}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
-                       historySize = 
MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
+
+               try {
+                       Configuration taskManagerConfig = 
environment.getTaskManagerInfo().getConfiguration();
+                       int historySize = 
taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
+                       if (historySize <= 0) {
+                               LOG.warn("{} has been set to a value equal or 
below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
+                               historySize = 
MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
+                       }
+                       TaskManagerJobMetricGroup jobMetricGroup = 
this.metrics.parent().parent();
+                       this.latencyStats = new 
LatencyStats(jobMetricGroup.addGroup("latency"), historySize, 
container.getIndexInSubtaskGroup(), getOperatorID());
+               } catch (Exception e) {
+                       LOG.warn("An error occurred while instantiating latency 
metrics.", e);
+                       this.latencyStats = new 
LatencyStats(UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
 1, 0, new OperatorID());
                }
 
-               latencyGauge = this.metrics.gauge("latency", new 
LatencyGauge(historySize));
                this.runtimeContext = new StreamingRuntimeContext(this, 
environment, container.getAccumulatorMap());
 
                stateKeySelector1 = config.getStatePartitioner(0, 
getUserCodeClassloader());
@@ -642,7 +646,7 @@ public abstract class AbstractStreamOperator<OUT>
 
        protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
                // all operators are tracking latencies
-               this.latencyGauge.reportLatency(marker, false);
+               this.latencyStats.reportLatency(marker);
 
                // everything except sinks forwards latency markers
                this.output.emitLatencyMarker(marker);
@@ -650,127 +654,6 @@ public abstract class AbstractStreamOperator<OUT>
 
        // ----------------------- Helper classes -----------------------
 
-
-       /**
-        * The gauge uses a HashMap internally to avoid classloading issues 
when accessing
-        * the values using JMX.
-        */
-       protected static class LatencyGauge implements Gauge<Map<String, 
HashMap<String, Double>>> {
-               private final Map<LatencySourceDescriptor, 
DescriptiveStatistics> latencyStats = new HashMap<>();
-               private final int historySize;
-
-               LatencyGauge(int historySize) {
-                       this.historySize = historySize;
-               }
-
-               public void reportLatency(LatencyMarker marker, boolean isSink) 
{
-                       LatencySourceDescriptor sourceDescriptor = 
LatencySourceDescriptor.of(marker, !isSink);
-                       DescriptiveStatistics sourceStats = 
latencyStats.get(sourceDescriptor);
-                       if (sourceStats == null) {
-                               // 512 element window (4 kb)
-                               sourceStats = new 
DescriptiveStatistics(this.historySize);
-                               latencyStats.put(sourceDescriptor, sourceStats);
-                       }
-                       long now = System.currentTimeMillis();
-                       sourceStats.addValue(now - marker.getMarkedTime());
-               }
-
-               @Override
-               public Map<String, HashMap<String, Double>> getValue() {
-                       while (true) {
-                               try {
-                                       Map<String, HashMap<String, Double>> 
ret = new HashMap<>();
-                                       for (Map.Entry<LatencySourceDescriptor, 
DescriptiveStatistics> source : latencyStats.entrySet()) {
-                                               HashMap<String, Double> 
sourceStatistics = new HashMap<>(6);
-                                               sourceStatistics.put("max", 
source.getValue().getMax());
-                                               sourceStatistics.put("mean", 
source.getValue().getMean());
-                                               sourceStatistics.put("min", 
source.getValue().getMin());
-                                               sourceStatistics.put("p50", 
source.getValue().getPercentile(50));
-                                               sourceStatistics.put("p95", 
source.getValue().getPercentile(95));
-                                               sourceStatistics.put("p99", 
source.getValue().getPercentile(99));
-                                               
ret.put(source.getKey().toString(), sourceStatistics);
-                                       }
-                                       return ret;
-                                       // Concurrent access onto the 
"latencyStats" map could cause
-                                       // ConcurrentModificationExceptions. To 
avoid unnecessary blocking
-                                       // of the reportLatency() method, we 
retry this operation until
-                                       // it succeeds.
-                               } catch (ConcurrentModificationException 
ignore) {
-                                       LOG.debug("Unable to report latency 
statistics", ignore);
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Identifier for a latency source.
-        */
-       private static class LatencySourceDescriptor {
-               /**
-                * A unique ID identifying a logical source in Flink.
-                */
-               private final int vertexID;
-
-               /**
-                * Identifier for parallel subtasks of a logical source.
-                */
-               private final int subtaskIndex;
-
-               /**
-                * Creates a {@code LatencySourceDescriptor} from a given 
{@code LatencyMarker}.
-                *
-                * @param marker The latency marker to extract the 
LatencySourceDescriptor from.
-                * @param ignoreSubtaskIndex Set to true to ignore the subtask 
index, to treat the latencies
-                *      from all the parallel instances of a source as the same.
-                * @return A LatencySourceDescriptor for the given marker.
-                */
-               public static LatencySourceDescriptor of(LatencyMarker marker, 
boolean ignoreSubtaskIndex) {
-                       if (ignoreSubtaskIndex) {
-                               return new 
LatencySourceDescriptor(marker.getVertexID(), -1);
-                       } else {
-                               return new 
LatencySourceDescriptor(marker.getVertexID(), marker.getSubtaskIndex());
-                       }
-
-               }
-
-               private LatencySourceDescriptor(int vertexID, int subtaskIndex) 
{
-                       this.vertexID = vertexID;
-                       this.subtaskIndex = subtaskIndex;
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-
-                       LatencySourceDescriptor that = 
(LatencySourceDescriptor) o;
-
-                       if (vertexID != that.vertexID) {
-                               return false;
-                       }
-                       return subtaskIndex == that.subtaskIndex;
-               }
-
-               @Override
-               public int hashCode() {
-                       int result = vertexID;
-                       result = 31 * result + subtaskIndex;
-                       return result;
-               }
-
-               @Override
-               public String toString() {
-                       return "LatencySourceDescriptor{" +
-                                       "vertexID=" + vertexID +
-                                       ", subtaskIndex=" + subtaskIndex +
-                                       '}';
-               }
-       }
-
        /**
         * Wrapping {@link Output} that updates metrics on the number of 
emitted elements.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/e40cb348/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index 667e130..1c70876 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -57,9 +57,9 @@ public class StreamSink<IN> extends 
AbstractUdfStreamOperator<Object, SinkFuncti
        }
 
        @Override
-       protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
+       protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
                // all operators are tracking latencies
-               this.latencyGauge.reportLatency(maker, true);
+               this.latencyStats.reportLatency(marker);
 
                // sinks don't forward latency markers
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e40cb348/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 744c90a..5600d8f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -67,7 +68,7 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                                getProcessingTimeService(),
                                collector,
                                
getExecutionConfig().getLatencyTrackingInterval(),
-                               getOperatorConfig().getVertexID(),
+                               this.getOperatorID(),
                                getRuntimeContext().getIndexOfThisSubtask());
                }
 
@@ -138,7 +139,7 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                                final ProcessingTimeService 
processingTimeService,
                                final Output<StreamRecord<OUT>> output,
                                long latencyTrackingInterval,
-                               final int vertexID,
+                               final OperatorID operatorId,
                                final int subtaskIndex) {
 
                        latencyMarkTimer = 
processingTimeService.scheduleAtFixedRate(
@@ -147,7 +148,7 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                                        public void onProcessingTime(long 
timestamp) throws Exception {
                                                try {
                                                        // 
ProcessingTimeService callbacks are executed under the checkpointing lock
-                                                       
output.emitLatencyMarker(new LatencyMarker(timestamp, vertexID, subtaskIndex));
+                                                       
output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, 
subtaskIndex));
                                                } catch (Throwable t) {
                                                        // we catch the 
Throwables here so that we don't trigger the processing
                                                        // timer services async 
exception handler

http://git-wip-us.apache.org/repos/asf/flink/blob/e40cb348/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
index 84af297..932e130 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.streamrecord;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 
 /**
  * Special record type carrying a timestamp of its creation time at a source 
operator
@@ -35,16 +36,16 @@ public final class LatencyMarker extends StreamElement {
        /** The time the latency mark is denoting. */
        private final long markedTime;
 
-       private final int vertexID;
+       private final OperatorID operatorId;
 
        private final int subtaskIndex;
 
        /**
         * Creates a latency mark with the given timestamp.
         */
-       public LatencyMarker(long markedTime, int vertexID, int subtaskIndex) {
+       public LatencyMarker(long markedTime, OperatorID operatorId, int 
subtaskIndex) {
                this.markedTime = markedTime;
-               this.vertexID = vertexID;
+               this.operatorId = operatorId;
                this.subtaskIndex = subtaskIndex;
        }
 
@@ -55,8 +56,8 @@ public final class LatencyMarker extends StreamElement {
                return markedTime;
        }
 
-       public int getVertexID() {
-               return vertexID;
+       public OperatorID getOperatorId() {
+               return operatorId;
        }
 
        public int getSubtaskIndex() {
@@ -79,7 +80,7 @@ public final class LatencyMarker extends StreamElement {
                if (markedTime != that.markedTime) {
                        return false;
                }
-               if (vertexID != that.vertexID) {
+               if (operatorId != that.operatorId) {
                        return false;
                }
                return subtaskIndex == that.subtaskIndex;
@@ -89,7 +90,7 @@ public final class LatencyMarker extends StreamElement {
        @Override
        public int hashCode() {
                int result = (int) (markedTime ^ (markedTime >>> 32));
-               result = 31 * result + vertexID;
+               result = 31 * result + operatorId.hashCode();
                result = 31 * result + subtaskIndex;
                return result;
        }
@@ -98,7 +99,7 @@ public final class LatencyMarker extends StreamElement {
        public String toString() {
                return "LatencyMarker{" +
                                "markedTime=" + markedTime +
-                               ", vertexID=" + vertexID +
+                               ", operatorId=" + operatorId +
                                ", subtaskIndex=" + subtaskIndex +
                                '}';
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e40cb348/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index d0ab60a..ba92416 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 
@@ -186,7 +187,8 @@ public final class StreamElementSerializer<T> extends 
TypeSerializer<StreamEleme
                else if (value.isLatencyMarker()) {
                        target.write(TAG_LATENCY_MARKER);
                        
target.writeLong(value.asLatencyMarker().getMarkedTime());
-                       target.writeInt(value.asLatencyMarker().getVertexID());
+                       
target.writeLong(value.asLatencyMarker().getOperatorId().getLowerPart());
+                       
target.writeLong(value.asLatencyMarker().getOperatorId().getUpperPart());
                        
target.writeInt(value.asLatencyMarker().getSubtaskIndex());
                }
                else {
@@ -211,7 +213,7 @@ public final class StreamElementSerializer<T> extends 
TypeSerializer<StreamEleme
                        return new StreamStatus(source.readInt());
                }
                else if (tag == TAG_LATENCY_MARKER) {
-                       return new LatencyMarker(source.readLong(), 
source.readInt(), source.readInt());
+                       return new LatencyMarker(source.readLong(), new 
OperatorID(source.readLong(), source.readLong()), source.readInt());
                }
                else {
                        throw new IOException("Corrupt stream, found tag: " + 
tag);
@@ -238,7 +240,7 @@ public final class StreamElementSerializer<T> extends 
TypeSerializer<StreamEleme
                        return new Watermark(source.readLong());
                }
                else if (tag == TAG_LATENCY_MARKER) {
-                       return new LatencyMarker(source.readLong(), 
source.readInt(), source.readInt());
+                       return new LatencyMarker(source.readLong(), new 
OperatorID(source.readLong(), source.readLong()), source.readInt());
                }
                else {
                        throw new IOException("Corrupt stream, found tag: " + 
tag);

http://git-wip-us.apache.org/repos/asf/flink/blob/e40cb348/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
new file mode 100644
index 0000000..4f3d33e
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The {@link LatencyStats} objects are used to track and report on the 
behavior of latencies across measurements.
+ */
+public class LatencyStats {
+       private final Map<String, DescriptiveStatisticsHistogram> latencyStats 
= new HashMap<>();
+       private final MetricGroup metricGroup;
+       private final int historySize;
+       private final int subtaskIndex;
+       private final OperatorID operatorId;
+
+       public LatencyStats(MetricGroup metricGroup, int historySize, int 
subtaskIndex, OperatorID operatorID) {
+               this.metricGroup = metricGroup;
+               this.historySize = historySize;
+               this.subtaskIndex = subtaskIndex;
+               this.operatorId = operatorID;
+       }
+
+       public void reportLatency(LatencyMarker marker) {
+               String uniqueName =  "" + marker.getOperatorId() + 
marker.getSubtaskIndex() + operatorId + subtaskIndex;
+               DescriptiveStatisticsHistogram latencyHistogram = 
this.latencyStats.get(uniqueName);
+               if (latencyHistogram == null) {
+                       latencyHistogram = new 
DescriptiveStatisticsHistogram(this.historySize);
+                       this.latencyStats.put(uniqueName, latencyHistogram);
+                       this.metricGroup
+                               .addGroup("source_id", 
String.valueOf(marker.getOperatorId()))
+                               .addGroup("source_subtask_index", 
String.valueOf(marker.getSubtaskIndex()))
+                               .addGroup("operator_id", 
String.valueOf(operatorId))
+                               .addGroup("operator_subtask_index", 
String.valueOf(subtaskIndex))
+                               .histogram("latency", latencyHistogram);
+               }
+
+               long now = System.currentTimeMillis();
+               latencyHistogram.update(now - marker.getMarkedTime());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e40cb348/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index d9fcc12..cf09a6e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -214,7 +215,7 @@ public class StreamSourceOperatorTest {
                for (; i < output.size() - 1; i++) {
                        StreamElement se = output.get(i);
                        Assert.assertTrue(se.isLatencyMarker());
-                       Assert.assertEquals(-1, 
se.asLatencyMarker().getVertexID());
+                       Assert.assertEquals(operator.getOperatorID(), 
se.asLatencyMarker().getOperatorId());
                        Assert.assertEquals(0, 
se.asLatencyMarker().getSubtaskIndex());
                        Assert.assertTrue(se.asLatencyMarker().getMarkedTime() 
== timestamp);
 
@@ -290,6 +291,7 @@ public class StreamSourceOperatorTest {
                cfg.setStateBackend(new MemoryStateBackend());
 
                cfg.setTimeCharacteristic(timeChar);
+               cfg.setOperatorID(new OperatorID());
 
                Environment env = new DummyEnvironment("MockTwoInputTask", 1, 
0);
 

Reply via email to