[FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54b88d71
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54b88d71
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54b88d71

Branch: refs/heads/release-1.3
Commit: 54b88d71c78a670762e152b337c04a2e68e8481d
Parents: 17a4bb1
Author: zentol <[email protected]>
Authored: Fri May 19 14:39:20 2017 +0200
Committer: zentol <[email protected]>
Committed: Fri May 19 21:09:08 2017 +0200

----------------------------------------------------------------------
 .../flink/streaming/runtime/io/StreamTwoInputProcessor.java | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/54b88d71/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index d34686d..367b773 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
@@ -114,6 +116,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
        private long lastEmittedWatermark1;
        private long lastEmittedWatermark2;
 
+       private Counter numRecordsIn;
+
        private boolean isFinished;
 
        @SuppressWarnings("unchecked")
@@ -195,6 +199,9 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                if (isFinished) {
                        return false;
                }
+               if (numRecordsIn == null) {
+                       numRecordsIn = ((OperatorMetricGroup) 
streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+               }
 
                while (true) {
                        if (currentRecordDeserializer != null) {
@@ -230,6 +237,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                                                else {
                                                        StreamRecord<IN1> 
record = recordOrWatermark.asRecord();
                                                        synchronized (lock) {
+                                                               
numRecordsIn.inc();
                                                                
streamOperator.setKeyContextElement1(record);
                                                                
streamOperator.processElement1(record);
                                                        }
@@ -256,6 +264,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                                                else {
                                                        StreamRecord<IN2> 
record = recordOrWatermark.asRecord();
                                                        synchronized (lock) {
+                                                               
numRecordsIn.inc();
                                                                
streamOperator.setKeyContextElement2(record);
                                                                
streamOperator.processElement2(record);
                                                        }

Reply via email to