zentol closed pull request #2753: [FLINK-4840] [metrics] Measure latency of 
record processing and expose it as a metric
URL: https://github.com/apache/flink/pull/2753
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 0a294fa65a1..8d86158065f 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -134,6 +134,12 @@ under the License.
                        <version>${jackson.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>io.dropwizard.metrics</groupId>
+                       <artifactId>metrics-core</artifactId>
+                       <version>${metrics.version}</version>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index c5296fbd063..5d06e2c2251 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -18,16 +18,21 @@
 
 package org.apache.flink.runtime.metrics.groups;
 
-import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
+import com.codahale.metrics.Reservoir;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Snapshot;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -50,6 +55,8 @@
        private final Meter numRecordsInRate;
        private final Meter numRecordsOutRate;
 
+       private final LatencyHistogram recordProcessLatency;
+
        public TaskIOMetricGroup(TaskMetricGroup parent) {
                super(parent);
 
@@ -63,6 +70,10 @@ public TaskIOMetricGroup(TaskMetricGroup parent) {
                this.numRecordsOut = counter("numRecordsOut", new SumCounter());
                this.numRecordsInRate = meter("numRecordsInPerSecond", new 
MeterView(numRecordsIn, 60));
                this.numRecordsOutRate = meter("numRecordsOutPerSecond", new 
MeterView(numRecordsOut, 60));
+               this.recordProcessLatency = histogram("recordProcessLatency", 
new LatencyHistogram(true));
+               if (recordProcessLatency.getLatencyAccumulateCounter() != null) 
{
+                       meter("recordProcTimeProportion", new 
MeterView(recordProcessLatency.getLatencyAccumulateCounter(), 60));
+               }
        }
 
        public IOMetrics createSnapshot() {
@@ -104,6 +115,10 @@ public Meter getNumBytesOutRateMeter() {
                return numBytesOutRate;
        }
 
+       public Histogram getRecordProcessLatency() {
+               return recordProcessLatency;
+       }
+
        // 
============================================================================================
        // Buffer metrics
        // 
============================================================================================
@@ -257,4 +272,134 @@ public long getCount() {
                        return sum;
                }
        }
+
+       // 
============================================================================================
+       // Latency metrics
+       // 
============================================================================================
+
+       /**
+        * Histogram measuring the record processing latency of a task.
+        * It's element processing time of a task. But an element emitting time 
for Source Task.
+        * It could be given a history size or a Reservoir when construct.
+        * A latency accumulate will be activated if accumulate enabled
+        */
+       private static class LatencyHistogram implements Histogram {
+
+               private static final int DEFAULT_HISTORY_SIZE = 128;
+
+               // conversion of millisecond and nanosecond
+               private static final double NANOSECONDS_PER_MILLISECOND = 
1000000.0D;
+
+               // a reservoir for history data
+               private final Reservoir latencyReservoir;
+
+               // accumulate latency for measurement processing time per second
+               private final SimpleCounter latencyAccumulateCounter;
+
+               public LatencyHistogram() {
+                       this(DEFAULT_HISTORY_SIZE);
+               }
+
+               public LatencyHistogram(int historySize) {
+                       //default disable accumulate
+                       this(historySize, false);
+               }
+
+               public LatencyHistogram(Reservoir latencyReservoir) {
+                       //default disable accumulate
+                       this(latencyReservoir, false);
+               }
+
+               public LatencyHistogram(boolean enableAccumulate) {
+                       this(DEFAULT_HISTORY_SIZE, enableAccumulate);
+               }
+
+               public LatencyHistogram(int historySize, boolean 
enableAccumulate) {
+                       //default with Sliding Window Reservoir
+                       this(new SlidingWindowReservoir(historySize), 
enableAccumulate);
+               }
+
+               public LatencyHistogram(Reservoir latencyReservoir, boolean 
enableAccumulate) {
+                       this.latencyReservoir = latencyReservoir;
+                       if (enableAccumulate) {
+                               latencyAccumulateCounter = new SimpleCounter();
+                       } else {
+                               latencyAccumulateCounter = null;
+                       }
+               }
+
+               @Override
+               public void update(long nanosecond) {
+                       latencyReservoir.update(nanosecond);
+                       if (latencyAccumulateCounter != null) {
+                               latencyAccumulateCounter.inc((long)(nanosecond 
/ NANOSECONDS_PER_MILLISECOND));
+                       }
+               }
+
+               @Override
+               public long getCount() {
+                       return latencyReservoir.size();
+               }
+
+               @Override
+               public HistogramStatistics getStatistics() {
+                       return new 
LatencyHistogramStatistics(latencyReservoir.getSnapshot());
+               }
+
+               public Counter getLatencyAccumulateCounter() {
+                       return latencyAccumulateCounter;
+               }
+
+
+               private static class LatencyHistogramStatistics extends 
HistogramStatistics {
+
+                       private final Snapshot latencySnapshot;
+
+                       public LatencyHistogramStatistics(Snapshot 
latencySnapshot) {
+                               this.latencySnapshot = latencySnapshot;
+                       }
+
+                       @Override
+                       public double getQuantile(double quantile) {
+                               return latencySnapshot.getValue(quantile) / 
NANOSECONDS_PER_MILLISECOND;
+                       }
+
+                       @Override
+                       public long[] getValues() {
+                               long [] nanos = latencySnapshot.getValues();
+                               long [] millis = new long[nanos.length];
+
+                               for (int i = 0; i < nanos.length; ++i) {
+                                       millis[i] = (long)(nanos[i] / 
NANOSECONDS_PER_MILLISECOND);
+                               }
+
+                               return millis;
+                       }
+
+                       @Override
+                       public int size() {
+                               return latencySnapshot.size();
+                       }
+
+                       @Override
+                       public double getMean() {
+                               return latencySnapshot.getMean() / 
NANOSECONDS_PER_MILLISECOND;
+                       }
+
+                       @Override
+                       public double getStdDev() {
+                               return latencySnapshot.getStdDev() / 
NANOSECONDS_PER_MILLISECOND;
+                       }
+
+                       @Override
+                       public long getMax() {
+                               return (long)(latencySnapshot.getMax() / 
NANOSECONDS_PER_MILLISECOND);
+                       }
+
+                       @Override
+                       public long getMin() {
+                               return (long)(latencySnapshot.getMin() / 
NANOSECONDS_PER_MILLISECOND);
+                       }
+               }
+       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a659866bbfd..66b67aaa9b1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -188,6 +188,11 @@ public MetricGroup getMetricGroup() {
                return metrics;
        }
 
+       @Override
+       public Output<StreamRecord<OUT>> getOutput() {
+               return output;
+       }
+
        @Override
        public final void initializeState(OperatorStateHandles stateHandles) 
throws Exception {
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index f6e547290e0..59086a94e7e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -136,4 +136,6 @@ OperatorSnapshotResult snapshotState(
        void setChainingStrategy(ChainingStrategy strategy);
        
        MetricGroup getMetricGroup();
+
+       Output<StreamRecord<OUT>> getOutput();
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 714317df9b7..60972aa6f76 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
@@ -81,6 +82,8 @@
 
        private Counter numRecordsIn;
 
+       private Histogram recordProcessLatency;
+
        @SuppressWarnings("unchecked")
        public StreamInputProcessor(
                        InputGate[] inputGates,
@@ -168,12 +171,15 @@ public boolean processInput(OneInputStreamOperator<IN, ?> 
streamOperator, final
                                                continue;
                                        } else {
                                                // now we can do the actual 
processing
+                                               long start=System.nanoTime();
                                                StreamRecord<IN> record = 
recordOrMark.asRecord();
                                                synchronized (lock) {
                                                        numRecordsIn.inc();
                                                        
streamOperator.setKeyContextElement1(record);
                                                        
streamOperator.processElement(record);
                                                }
+                                               long end=System.nanoTime();
+                                               recordProcessLatency.update(end 
- start);
                                                return true;
                                        }
                                }
@@ -210,6 +216,8 @@ public boolean processInput(OneInputStreamOperator<IN, ?> 
streamOperator, final
         * @param metrics metric group
         */
        public void setMetricGroup(TaskIOMetricGroup metrics) {
+               recordProcessLatency = metrics.getRecordProcessLatency();
+
                metrics.gauge("currentLowWatermark", new Gauge<Long>() {
                        @Override
                        public Long getValue() {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 5f7ffe45a34..85963ad448b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,8 +20,11 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -85,6 +88,10 @@
        private final DeserializationDelegate<StreamElement> 
deserializationDelegate1;
        private final DeserializationDelegate<StreamElement> 
deserializationDelegate2;
 
+       private Counter numRecordsIn;
+
+       private Histogram recordProcessLatency;
+
        @SuppressWarnings({"unchecked", "rawtypes"})
        public StreamTwoInputProcessor(
                        Collection<InputGate> inputGates1,
@@ -149,6 +156,10 @@ public boolean processInput(TwoInputStreamOperator<IN1, 
IN2, ?> streamOperator,
                        return false;
                }
 
+               if (numRecordsIn == null) {
+                       numRecordsIn = 
((OperatorMetricGroup)streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+               }
+
                while (true) {
                        if (currentRecordDeserializer != null) {
                                DeserializationResult result;
@@ -177,10 +188,14 @@ else if (recordOrWatermark.isLatencyMarker()) {
                                                        continue;
                                                }
                                                else {
+                                                       long 
start=System.nanoTime();
                                                        synchronized (lock) {
+                                                               
numRecordsIn.inc();
                                                                
streamOperator.setKeyContextElement1(recordOrWatermark.<IN1>asRecord());
                                                                
streamOperator.processElement1(recordOrWatermark.<IN1>asRecord());
                                                        }
+                                                       long 
end=System.nanoTime();
+                                                       
recordProcessLatency.update(end - start);
                                                        return true;
 
                                                }
@@ -198,10 +213,14 @@ else if (recordOrWatermark.isLatencyMarker()) {
                                                        continue;
                                                }
                                                else {
+                                                       long 
start=System.nanoTime();
                                                        synchronized (lock) {
+                                                               
numRecordsIn.inc();
                                                                
streamOperator.setKeyContextElement2(recordOrWatermark.<IN2>asRecord());
                                                                
streamOperator.processElement2(recordOrWatermark.<IN2>asRecord());
                                                        }
+                                                       long 
end=System.nanoTime();
+                                                       
recordProcessLatency.update(end - start);
                                                        return true;
                                                }
                                        }
@@ -275,6 +294,8 @@ private void handleWatermark(TwoInputStreamOperator<IN1, 
IN2, ?> operator, Water
         * @param metrics metric group
         */
        public void setMetricGroup(TaskIOMetricGroup metrics) {
+               recordProcessLatency = metrics.getRecordProcessLatency();
+
                metrics.gauge("currentLowWatermark", new Gauge<Long>() {
                        @Override
                        public Long getValue() {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 7ae99f6c036..ab5b866829d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -19,8 +19,14 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
  * Task for executing streaming sources.
@@ -40,24 +46,65 @@
 public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends 
StreamSource<OUT, SRC>>
        extends StreamTask<OUT, OP> {
 
+       private Output<StreamRecord<OUT>> output;
+
        @Override
        protected void init() {
-               // does not hold any resources, so no initialization needed
+               output = new SourceOutput<>(getHeadOutput(), 
getEnvironment().getMetricGroup().getIOMetricGroup());
        }
 
        @Override
        protected void cleanup() {
-               // does not hold any resources, so no cleanup needed
        }
        
 
        @Override
        protected void run() throws Exception {
-               headOperator.run(getCheckpointLock());
+               headOperator.run(getCheckpointLock(), output);
        }
        
        @Override
        protected void cancelTask() throws Exception {
                headOperator.cancel();
        }
+
+       /**
+        * Special output for sources. for example measuring elements emit 
latency as a metric.
+        *
+        *  @param <OUT> Type of the output elements of this source.
+        */
+       private static class SourceOutput<OUT> implements 
Output<StreamRecord<OUT>> {
+
+               private final Output<StreamRecord<OUT>> output;
+
+               private final Histogram recordProcessLatency;
+
+               public SourceOutput(Output<StreamRecord<OUT>> output, 
TaskIOMetricGroup ioMetricGroup) {
+                       this.output = output;
+                       this.recordProcessLatency = 
ioMetricGroup.getRecordProcessLatency();
+               }
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+                       output.emitWatermark(mark);
+               }
+
+               @Override
+               public void emitLatencyMarker(LatencyMarker latencyMarker) {
+                       output.emitLatencyMarker(latencyMarker);
+               }
+
+               @Override
+               public void collect(StreamRecord<OUT> record) {
+                       long start=System.nanoTime();
+                       output.collect(record);
+                       long end=System.nanoTime();
+                       recordProcessLatency.update(end - start);
+               }
+
+               @Override
+               public void close() {
+                       output.close();
+               }
+       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index fa7d1b09a50..c52c9d73b53 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -486,7 +486,7 @@ public StreamConfig getConfiguration() {
        }
 
        Output<StreamRecord<OUT>> getHeadOutput() {
-               return operatorChain.getChainEntryPoint();
+               return headOperator.getOutput();
        }
 
        RecordWriterOutput<?>[] getStreamOutputs() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to