[FLINK-3949] [metrics] Add numSplitsProcessed counter metric. This closes #2119
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a0c268d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a0c268d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a0c268d Branch: refs/heads/master Commit: 5a0c268dbd4abdf39c7b9d8f25ea629dfd4681b1 Parents: 18744b2 Author: zentol <[email protected]> Authored: Fri Jun 17 09:40:01 2016 +0200 Committer: Fabian Hueske <[email protected]> Committed: Sat Jun 18 23:40:23 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/runtime/operators/DataSourceTask.java | 4 ++-- .../api/functions/source/ContinuousFileReaderOperator.java | 3 +++ .../api/functions/source/InputFormatSourceFunction.java | 3 +++ .../api/functions/source/InputFormatSourceFunctionTest.java | 7 +++++++ 4 files changed, 15 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index c57f133..68e29b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -101,7 +101,7 @@ public class DataSourceTask<OT> extends AbstractInvokable { LOG.debug(getLogString("Starting data source operator")); RuntimeContext ctx = createRuntimeContext(); - Counter splitCounter = ctx.getMetricGroup().counter("numSplitsProcessed"); + Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed"); Counter numRecordsOut = ctx.getMetricGroup().counter("numRecordsOut"); if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) { @@ -172,7 +172,7 @@ public class DataSourceTask<OT> extends AbstractInvokable { // close. We close here such that a regular close throwing an exception marks a task as failed. format.close(); } - splitCounter.inc(); + completedSplitsCounter.inc(); } // end for all input splits // close the collector. if it is a chaining task collector, it will close its chained tasks http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 9319338..455c753 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -235,6 +236,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A public void run() { try { + Counter completedSplitsCounter = getMetricGroup().counter("numSplitsProcessed"); this.format.openInputFormat(); while (this.isRunning) { @@ -290,6 +292,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } } } + completedSplitsCounter.inc(); } finally { // close and prepare for the next iteration http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java index bce1ec5..f35cbba 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -70,6 +71,7 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O public void run(SourceContext<OUT> ctx) throws Exception { try { + Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed"); if (isRunning && format instanceof RichInputFormat) { ((RichInputFormat) format).openInputFormat(); } @@ -86,6 +88,7 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O ctx.collect(nextElement); } format.close(); + completedSplitsCounter.inc(); if (isRunning) { isRunning = splitIterator.hasNext(); http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java index 32776e1..a41c7db 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java @@ -26,6 +26,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockEnvironment; @@ -253,6 +255,11 @@ public class InputFormatSourceFunctionTest { } @Override + public MetricGroup getMetricGroup() { + return new UnregisteredMetricsGroup(); + } + + @Override public InputSplitProvider getInputSplitProvider() { try { this.inputSplits = format.createInputSplits(noOfSplits);
