[FLINK-3949] [metrics] Add numSplitsProcessed counter metric.

This closes #2119


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a0c268d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a0c268d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a0c268d

Branch: refs/heads/master
Commit: 5a0c268dbd4abdf39c7b9d8f25ea629dfd4681b1
Parents: 18744b2
Author: zentol <[email protected]>
Authored: Fri Jun 17 09:40:01 2016 +0200
Committer: Fabian Hueske <[email protected]>
Committed: Sat Jun 18 23:40:23 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/operators/DataSourceTask.java    | 4 ++--
 .../api/functions/source/ContinuousFileReaderOperator.java    | 3 +++
 .../api/functions/source/InputFormatSourceFunction.java       | 3 +++
 .../api/functions/source/InputFormatSourceFunctionTest.java   | 7 +++++++
 4 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index c57f133..68e29b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -101,7 +101,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
                LOG.debug(getLogString("Starting data source operator"));
 
                RuntimeContext ctx = createRuntimeContext();
-               Counter splitCounter = 
ctx.getMetricGroup().counter("numSplitsProcessed");
+               Counter completedSplitsCounter = 
ctx.getMetricGroup().counter("numSplitsProcessed");
                Counter numRecordsOut = 
ctx.getMetricGroup().counter("numRecordsOut");
 
                if 
(RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
@@ -172,7 +172,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
                                        // close. We close here such that a 
regular close throwing an exception marks a task as failed.
                                        format.close();
                                }
-                               splitCounter.inc();
+                               completedSplitsCounter.inc();
                        } // end for all input splits
 
                        // close the collector. if it is a chaining task 
collector, it will close its chained tasks

http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 9319338..455c753 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -235,6 +236,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                public void run() {
                        try {
 
+                               Counter completedSplitsCounter = 
getMetricGroup().counter("numSplitsProcessed");
                                this.format.openInputFormat();
 
                                while (this.isRunning) {
@@ -290,6 +292,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                                                }
                                                        }
                                                }
+                                               completedSplitsCounter.inc();
 
                                        } finally {
                                                // close and prepare for the 
next iteration

http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
index bce1ec5..f35cbba 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
@@ -70,6 +71,7 @@ public class InputFormatSourceFunction<OUT> extends 
RichParallelSourceFunction<O
        public void run(SourceContext<OUT> ctx) throws Exception {
                try {
 
+                       Counter completedSplitsCounter = 
getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
                        if (isRunning && format instanceof RichInputFormat) {
                                ((RichInputFormat) format).openInputFormat();
                        }
@@ -86,6 +88,7 @@ public class InputFormatSourceFunction<OUT> extends 
RichParallelSourceFunction<O
                                        ctx.collect(nextElement);
                                }
                                format.close();
+                               completedSplitsCounter.inc();
 
                                if (isRunning) {
                                        isRunning = splitIterator.hasNext();

http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index 32776e1..a41c7db 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -253,6 +255,11 @@ public class InputFormatSourceFunctionTest {
                }
 
                @Override
+               public MetricGroup getMetricGroup() {
+                       return new UnregisteredMetricsGroup();
+               }
+
+               @Override
                public InputSplitProvider getInputSplitProvider() {
                        try {
                                this.inputSplits = 
format.createInputSplits(noOfSplits);

Reply via email to