[FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54b88d71 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54b88d71 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54b88d71 Branch: refs/heads/release-1.3 Commit: 54b88d71c78a670762e152b337c04a2e68e8481d Parents: 17a4bb1 Author: zentol <[email protected]> Authored: Fri May 19 14:39:20 2017 +0200 Committer: zentol <[email protected]> Committed: Fri May 19 21:09:08 2017 +0200 ---------------------------------------------------------------------- .../flink/streaming/runtime/io/StreamTwoInputProcessor.java | 9 +++++++++ 1 file changed, 9 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/54b88d71/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index d34686d..367b773 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -38,6 +39,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; @@ -114,6 +116,8 @@ public class StreamTwoInputProcessor<IN1, IN2> { private long lastEmittedWatermark1; private long lastEmittedWatermark2; + private Counter numRecordsIn; + private boolean isFinished; @SuppressWarnings("unchecked") @@ -195,6 +199,9 @@ public class StreamTwoInputProcessor<IN1, IN2> { if (isFinished) { return false; } + if (numRecordsIn == null) { + numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + } while (true) { if (currentRecordDeserializer != null) { @@ -230,6 +237,7 @@ public class StreamTwoInputProcessor<IN1, IN2> { else { StreamRecord<IN1> record = recordOrWatermark.asRecord(); synchronized (lock) { + numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement1(record); } @@ -256,6 +264,7 @@ public class StreamTwoInputProcessor<IN1, IN2> { else { StreamRecord<IN2> record = recordOrWatermark.asRecord(); synchronized (lock) { + numRecordsIn.inc(); streamOperator.setKeyContextElement2(record); streamOperator.processElement2(record); }
