This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch FLINK-19743
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 560f61e33a8307b83a989c5e6e656c4f45edb092
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Thu Oct 22 08:21:21 2020 +0800

    [FLINK-19743][connectors/common] Add metric definitions of Source. (FLIP-33)
    - Expose the Source metrics via SourceMetricGroup and SourceReaderContext.
    - Let SourceOperator report the WatermarkLag.
---
 .../file/src/FileSourceHeavyThroughputTest.java    |  8 +--
 .../api/connector/source/SourceReaderContext.java  |  4 +-
 .../source/metrics/SourceMetricGroup.java          | 54 +++++++++++++++
 .../source/lib/NumberSequenceSourceTest.java       | 26 +++++++-
 .../source/mocks/MockSourceMetricGroup.java        | 50 ++++++++++++++
 .../connector/source/mocks/MockSourceReader.java   | 12 +++-
 .../metrics/groups/SourceMetricGroupImpl.java      | 76 ++++++++++++++++++++++
 flink-streaming-java/pom.xml                       |  7 ++
 .../streaming/api/operators/SourceOperator.java    | 58 +++++++++++++++--
 .../source/NoOpTimestampsAndWatermarks.java        |  6 ++
 .../source/ProgressiveTimestampsAndWatermarks.java | 14 +++-
 .../operators/source/TimestampsAndWatermarks.java  |  7 ++
 .../operators/source/WatermarkToDataOutput.java    |  4 ++
 .../api/operators/SourceOperatorTest.java          | 27 ++++++++
 .../api/operators/source/CollectingDataOutput.java |  4 +-
 .../operators/source/TestingSourceOperator.java    |  6 +-
 ...tStreamTaskChainedSourcesCheckpointingTest.java |  8 +--
 .../runtime/tasks/MultipleInputStreamTaskTest.java | 15 ++---
 .../tasks/SourceOperatorStreamTaskTest.java        |  3 +-
 .../source/reader/TestingReaderContext.java        |  7 +-
 .../source/reader/TestingSourceMetricGroup.java    | 48 ++++++++++++++
 21 files changed, 406 insertions(+), 38 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
index 9a43643..43a1efe 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
@@ -26,15 +26,15 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.metrics.SourceMetricGroup;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
 import org.apache.flink.connector.file.src.reader.StreamFormat;
 import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import 
org.apache.flink.connector.testutils.source.reader.TestingSourceMetricGroup;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.InputStatus;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 
 import org.junit.After;
 import org.junit.Test;
@@ -186,8 +186,8 @@ public class FileSourceHeavyThroughputTest {
        private static final class NoOpReaderContext implements 
SourceReaderContext {
 
                @Override
-               public MetricGroup metricGroup() {
-                       return new UnregisteredMetricsGroup();
+               public SourceMetricGroup metricGroup() {
+                       return new TestingSourceMetricGroup();
                }
 
                @Override
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index f1ea255..2c5c696 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -19,8 +19,8 @@
 package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.metrics.SourceMetricGroup;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricGroup;
 
 /**
  * The class that expose some context from runtime to the {@link SourceReader}.
@@ -31,7 +31,7 @@ public interface SourceReaderContext {
        /**
         * @return The metric group this source belongs to.
         */
-       MetricGroup metricGroup();
+       SourceMetricGroup metricGroup();
 
        /**
         * Gets the configuration with which Flink was started.
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/metrics/SourceMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/metrics/SourceMetricGroup.java
new file mode 100644
index 0000000..d6fc667
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/metrics/SourceMetricGroup.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * An interface that defines the metric names of the Source (FLIP-27). The 
interface
+ * also exposes the methods for the predefined metrics that need to be updated 
by
+ * the Source implementations.
+ */
+public interface SourceMetricGroup extends MetricGroup {
+       // Source metrics convention specified in FLIP-33.
+       String CURRENT_FETCH_EVENT_TIME_LAG = "currentFetchEventTimeLag";
+       String CURRENT_EMIT_EVENT_TIME_LAG = "currentEmitEventTimeLag";
+       String WATERMARK_LAG = "watermarkLag";
+       String SOURCE_IDLE_TIME = "sourceIdleTime";
+       String PENDING_BYTES = "pendingBytes";
+       String PENDING_RECORDS = "pendingRecords";
+       String IO_NUM_RECORDS_IN_ERRORS = "numRecordsInErrors";
+
+       /**
+        * @return the counter to record number of records received by the 
SourceReader.
+        */
+       Counter getNumRecordsInCounter();
+
+       /**
+        * @return the counter to record number of bytes received by the 
SourceReader.
+        */
+       Counter getNumBytesInCounter();
+
+       /**
+        * @return the counter to record the errors encountered when reading 
the records.
+        */
+       Counter getNumRecordsInErrorsCounter();
+
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
index 518ce86..cc961eb 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
@@ -24,9 +24,10 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.metrics.SourceMetricGroup;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputStatus;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 
 import org.junit.Test;
@@ -106,8 +107,8 @@ public class NumberSequenceSourceTest {
        private static final class DummyReaderContext implements 
SourceReaderContext {
 
                @Override
-               public MetricGroup metricGroup() {
-                       return new UnregisteredMetricsGroup();
+               public SourceMetricGroup metricGroup() {
+                       return new DummySourceMetricsGroup();
                }
 
                @Override
@@ -168,4 +169,23 @@ public class NumberSequenceSourceTest {
                        return emittedRecords;
                }
        }
+
+       private static final class DummySourceMetricsGroup
+               extends UnregisteredMetricsGroup implements SourceMetricGroup {
+
+               @Override
+               public Counter getNumRecordsInCounter() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public Counter getNumBytesInCounter() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public Counter getNumRecordsInErrorsCounter() {
+                       throw new UnsupportedOperationException();
+               }
+       }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceMetricGroup.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceMetricGroup.java
new file mode 100644
index 0000000..0b3cae4
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceMetricGroup.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.mocks;
+
+import org.apache.flink.api.connector.source.metrics.SourceMetricGroup;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+/**
+ * A testing implementation of {@link 
org.apache.flink.api.connector.source.metrics.SourceMetricGroup}.
+ * This class is not supposed to be used by classes out of the flink-core 
module. Please use
+ * TestingSourceMetricGroup in flink-connector-test-utils instead.
+ */
+public class MockSourceMetricGroup extends UnregisteredMetricsGroup implements 
SourceMetricGroup {
+       private final Counter numRecordsInCounter = new SimpleCounter();
+       private final Counter numBytesInCounter = new SimpleCounter();
+       private final Counter numRecordsInErrorsCounter = new SimpleCounter();
+
+       @Override
+       public Counter getNumRecordsInCounter() {
+               return numRecordsInCounter;
+       }
+
+       @Override
+       public Counter getNumBytesInCounter() {
+               return numBytesInCounter;
+       }
+
+       @Override
+       public Counter getNumRecordsInErrorsCounter() {
+               return numRecordsInErrorsCounter;
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index 8eb4596..38805ad 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.connector.source.mocks;
 
+import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
@@ -43,6 +44,7 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
        private boolean started;
        private boolean closed;
        private boolean waitingForMoreSplits;
+       private boolean emitWatermark;
 
        @GuardedBy("this")
        private CompletableFuture<Void> availableFuture;
@@ -76,7 +78,11 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
                }
                // Read from the split with available record.
                if (currentSplitIndex < assignedSplits.size()) {
-                       
sourceOutput.collect(assignedSplits.get(currentSplitIndex).getNext(false)[0]);
+                       int value = 
assignedSplits.get(currentSplitIndex).getNext(false)[0];
+                       sourceOutput.collect(value, value);
+                       if (emitWatermark) {
+                               sourceOutput.emitWatermark(new 
Watermark(value));
+                       }
                        return InputStatus.MORE_AVAILABLE;
                } else if (finished) {
                        // In case no split has available record, return 
depending on whether all the splits has finished.
@@ -153,6 +159,10 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
                }
        }
 
+       public void enableWatermarkEmission() {
+               emitWatermark = true;
+       }
+
        public boolean isStarted() {
                return started;
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/SourceMetricGroupImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/SourceMetricGroupImpl.java
new file mode 100644
index 0000000..d2b67bf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/SourceMetricGroupImpl.java
@@ -0,0 +1,76 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.metrics.SourceMetricGroup;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.clock.SystemClock;
+
+import java.util.function.Supplier;
+
+/**
+ * A metric group that adds source specific metrics to the SourceOperator.
+ * It adds Source specific metrics to the OperatorMetricsGroup.
+ */
+public class SourceMetricGroupImpl extends 
ProxyMetricGroup<OperatorMetricGroup> implements SourceMetricGroup {
+       private final Counter numRecordsInCounter;
+       private final Counter numRecordsInErrorsCounter;
+       private final Counter numBytesIn;
+       private final Supplier<Long> currentWatermarkSupplier;
+
+       public SourceMetricGroupImpl(
+                       OperatorMetricGroup operatorMetricGroup,
+                       Counter numBytesInCounter,
+                       Supplier<Long> currentWatermarkSupplier) {
+               super(operatorMetricGroup);
+               this.numRecordsInCounter = 
operatorMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+               this.numRecordsInErrorsCounter = 
operatorMetricGroup.counter(SourceMetricGroup.IO_NUM_RECORDS_IN_ERRORS);
+               this.numBytesIn = numBytesInCounter;
+               this.currentWatermarkSupplier = currentWatermarkSupplier;
+               operatorMetricGroup.gauge(
+                               SourceMetricGroup.WATERMARK_LAG,
+                               () -> {
+                                       long currentWatermark = 
currentWatermarkSupplier.get();
+                                       if (currentWatermark < 0) {
+                                               return -1L;
+                                       } else {
+                                               return 
SystemClock.getInstance().absoluteTimeMillis() - currentWatermark;
+                                       }
+                               });
+       }
+
+       @Override
+       public Counter getNumRecordsInCounter() {
+               return numRecordsInCounter;
+       }
+
+       public Counter getNumBytesInCounter() {
+               return numBytesIn;
+       }
+
+       public Counter getNumRecordsInErrorsCounter() {
+               return numRecordsInErrorsCounter;
+       }
+
+       @VisibleForTesting
+       public Supplier<Long> getCurrentWatermarkSupplier() {
+               return currentWatermarkSupplier;
+       }
+}
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index f0bd66b..94c1c16 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -90,6 +90,13 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 46839b3..5234805 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -28,10 +28,12 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.metrics.SourceMetricGroup;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.SourceMetricGroupImpl;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
@@ -42,10 +44,13 @@ import 
org.apache.flink.runtime.source.event.RequestSplitEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.function.FunctionWithException;
@@ -119,6 +124,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
         * but we currently need to instantiate this lazily, because the metric 
groups exist only later. */
        private TimestampsAndWatermarks<OUT> eventTimeLogic;
 
+       /** The metric group is initialized lazily at runtime. */
+       private SourceMetricGroup sourceMetricGroup;
+
        public SourceOperator(
                        FunctionWithException<SourceReaderContext, 
SourceReader<OUT, SplitT>, Exception> readerFactory,
                        OperatorEventGateway operatorEventGateway,
@@ -128,6 +136,22 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
                        Configuration configuration,
                        String localHostname,
                        boolean emitProgressiveWatermarks) {
+               // The SourceMetricGorup is set to null here because it is set 
lazily in the setup().
+               this(readerFactory, operatorEventGateway, splitSerializer, 
watermarkStrategy,
+                       timeService, configuration, localHostname, 
emitProgressiveWatermarks, null);
+       }
+
+       @VisibleForTesting
+       protected SourceOperator(
+               FunctionWithException<SourceReaderContext, SourceReader<OUT, 
SplitT>, Exception> readerFactory,
+               OperatorEventGateway operatorEventGateway,
+               SimpleVersionedSerializer<SplitT> splitSerializer,
+               WatermarkStrategy<OUT> watermarkStrategy,
+               ProcessingTimeService timeService,
+               Configuration configuration,
+               String localHostname,
+               boolean emitProgressiveWatermarks,
+               SourceMetricGroup sourceMetricGroup) {
 
                this.readerFactory = checkNotNull(readerFactory);
                this.operatorEventGateway = checkNotNull(operatorEventGateway);
@@ -137,16 +161,35 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
                this.configuration = checkNotNull(configuration);
                this.localHostname = checkNotNull(localHostname);
                this.emitProgressiveWatermarks = emitProgressiveWatermarks;
+               this.sourceMetricGroup = sourceMetricGroup;
+       }
+
+       @Override
+       public void setup(
+                       StreamTask<?, ?> containingTask,
+                       StreamConfig config,
+                       Output<StreamRecord<OUT>> output) {
+               super.setup(containingTask, config, output);
+               // Reuse the numBytesInCounter from TaskMetricGroup.
+               sourceMetricGroup = new SourceMetricGroupImpl(
+                       super.metrics,
+                       
super.metrics.parent().getIOMetricGroup().getNumBytesInCounter(),
+                       () -> {
+                               if (eventTimeLogic != null) {
+                                       return eventTimeLogic.getWatermark();
+                               } else {
+                                       return -1L;
+                               }
+                       });
        }
 
        @Override
        public void open() throws Exception {
-               final MetricGroup metricGroup = getMetricGroup();
 
                final SourceReaderContext context = new SourceReaderContext() {
                        @Override
-                       public MetricGroup metricGroup() {
-                               return metricGroup;
+                       public SourceMetricGroup metricGroup() {
+                               return sourceMetricGroup;
                        }
 
                        @Override
@@ -180,13 +223,13 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
                if (emitProgressiveWatermarks) {
                        eventTimeLogic = 
TimestampsAndWatermarks.createProgressiveEventTimeLogic(
                                        watermarkStrategy,
-                                       metricGroup,
+                                       sourceMetricGroup,
                                        getProcessingTimeService(),
                                        
getExecutionConfig().getAutoWatermarkInterval());
                } else {
                        eventTimeLogic = 
TimestampsAndWatermarks.createNoOpEventTimeLogic(
                                        watermarkStrategy,
-                                       metricGroup);
+                                       sourceMetricGroup);
                }
 
                sourceReader = readerFactory.apply(context);
@@ -218,6 +261,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
        }
 
        @Override
+       public MetricGroup getMetricGroup() {
+               return sourceMetricGroup;
+       }
+
+       @Override
        public InputStatus emitNext(DataOutput<OUT> output) throws Exception {
                // guarding an assumptions we currently make due to the fact 
that certain classes
                // assume a constant output
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java
index f28ef55..d263bd6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java
@@ -64,6 +64,12 @@ public class NoOpTimestampsAndWatermarks<T> implements 
TimestampsAndWatermarks<T
                // no periodic watermarks
        }
 
+       @Override
+       public long getWatermark() {
+               // No watermark.
+               return -1L;
+       }
+
        // 
------------------------------------------------------------------------
 
        /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
index 1d67af2..5a11a01 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
@@ -65,6 +65,9 @@ public class ProgressiveTimestampsAndWatermarks<T> implements 
TimestampsAndWater
        private StreamingReaderOutput<T> currentMainOutput;
 
        @Nullable
+       private WatermarkToDataOutput watermarkOutput;
+
+       @Nullable
        private ScheduledFuture<?> periodicEmitHandle;
 
        public ProgressiveTimestampsAndWatermarks(
@@ -98,7 +101,7 @@ public class ProgressiveTimestampsAndWatermarks<T> 
implements TimestampsAndWater
                // support re-assigning the underlying output
                checkState(currentMainOutput == null && currentPerSplitOutputs 
== null, "already created a main output");
 
-               final WatermarkOutput watermarkOutput = new 
WatermarkToDataOutput(output);
+               watermarkOutput = new WatermarkToDataOutput(output);
                final WatermarkGenerator<T> watermarkGenerator = 
watermarksFactory.createWatermarkGenerator(watermarksContext);
 
                currentPerSplitOutputs = new SplitLocalOutputs<>(
@@ -141,6 +144,15 @@ public class ProgressiveTimestampsAndWatermarks<T> 
implements TimestampsAndWater
                }
        }
 
+       @Override
+       public long getWatermark() {
+               if (watermarkOutput != null) {
+                       return watermarkOutput.getMaxWatermarkSoFar();
+               } else {
+                       return -1L;
+               }
+       }
+
        void triggerPeriodicEmit(@SuppressWarnings("unused") long 
wallClockTimestamp) {
                if (currentPerSplitOutputs != null) {
                        currentPerSplitOutputs.emitPeriodicWatermark();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
index 51068d2..d8320e8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
@@ -63,6 +63,13 @@ public interface TimestampsAndWatermarks<T> {
         */
        void stopPeriodicWatermarkEmits();
 
+       /**
+        * Get the current watermark.
+        *
+        * @return the current watermark.
+        */
+       long getWatermark();
+
        // 
------------------------------------------------------------------------
        //  factories
        // 
------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
index 3e3b255..8c26d7b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
@@ -83,4 +83,8 @@ public final class WatermarkToDataOutput implements 
WatermarkOutput {
                        throw new ExceptionInChainedOperatorException(e);
                }
        }
+
+       public long getMaxWatermarkSoFar() {
+               return maxWatermarkSoFar;
+       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 78348d8..541d28c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.metrics.groups.SourceMetricGroupImpl;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -42,6 +43,7 @@ import 
org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
 import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
 import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
@@ -179,6 +181,31 @@ public class SourceOperatorTest {
                assertEquals(100L, (long) 
mockSourceReader.getAbortedCheckpoints().get(0));
        }
 
+       @Test
+       public void testMetrics() throws Exception {
+               StateInitializationContext stateContext = getStateContext();
+               operator.initializeState(stateContext);
+               operator.open();
+               mockSourceReader.enableWatermarkEmission();
+               try {
+                       MockSourceSplit newSplit = new MockSourceSplit(2);
+                       newSplit.addRecord(100);
+                       operator.handleOperatorEvent(new AddSplitEvent<>(
+                                       Collections.singletonList(newSplit), 
new MockSourceSplitSerializer()));
+                       CollectingDataOutput<Integer> output = new 
CollectingDataOutput<>();
+                       SourceMetricGroupImpl metrics =
+                                       (SourceMetricGroupImpl) 
operator.getMetricGroup();
+                       assertEquals(-1L, (long) 
metrics.getCurrentWatermarkSupplier().get());
+                       operator.emitNext(output);
+                       assertEquals("There should be a record and a watermark 
emission.",
+                                       2, output.events.size());
+                       assertEquals(100L, (long) 
metrics.getCurrentWatermarkSupplier().get());
+               }
+               finally {
+                       operator.close();
+               }
+       }
+
        // ---------------- helper methods -------------------------
 
        private StateInitializationContext getStateContext() throws Exception {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java
index a536553..a1a3b20 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java
@@ -30,9 +30,9 @@ import java.util.List;
 /**
  * A test utility implementation of {@link PushingAsyncDataInput.DataOutput} 
that collects all events.
  */
-final class CollectingDataOutput<E> implements 
PushingAsyncDataInput.DataOutput<E> {
+public final class CollectingDataOutput<E> implements 
PushingAsyncDataInput.DataOutput<E> {
 
-       final List<Object> events = new ArrayList<>();
+       public final List<Object> events = new ArrayList<>();
 
        @Override
        public void emitWatermark(Watermark watermark) throws Exception {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 46b1382..61d19b9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.testutils.source.reader.TestingSourceMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
@@ -36,7 +37,7 @@ import 
org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 /**
  * A SourceOperator extension to simplify test setup.
  */
-public class TestingSourceOperator<T>  extends SourceOperator<T, 
MockSourceSplit> {
+public class TestingSourceOperator<T> extends SourceOperator<T, 
MockSourceSplit> {
 
        private static final long serialVersionUID = 1L;
 
@@ -80,7 +81,8 @@ public class TestingSourceOperator<T>  extends 
SourceOperator<T, MockSourceSplit
                        timeService,
                        new Configuration(),
                        "localhost",
-                       emitProgressiveWatermarks);
+                       emitProgressiveWatermarks,
+                       new TestingSourceMetricGroup());
 
                this.subtaskIndex = subtaskIndex;
                this.parallelism = parallelism;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
index 5f912ab..9e104d8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
@@ -137,9 +137,9 @@ public class 
MultipleInputStreamTaskChainedSourcesCheckpointingTest {
                        Future<Boolean> checkpointFuture = 
testHarness.getStreamTask().triggerCheckpointAsync(metaData, 
barrier.getCheckpointOptions(), false);
                        processSingleStepUntil(testHarness, 
checkpointFuture::isDone);
 
-                       expectedOutput.add(new StreamRecord<>("42", 
TimestampAssigner.NO_TIMESTAMP));
-                       expectedOutput.add(new StreamRecord<>("42", 
TimestampAssigner.NO_TIMESTAMP));
-                       expectedOutput.add(new StreamRecord<>("42", 
TimestampAssigner.NO_TIMESTAMP));
+                       expectedOutput.add(new StreamRecord<>("42", 42L));
+                       expectedOutput.add(new StreamRecord<>("42", 42L));
+                       expectedOutput.add(new StreamRecord<>("42", 42L));
                        expectedOutput.add(new StreamRecord<>("44", 
TimestampAssigner.NO_TIMESTAMP));
                        expectedOutput.add(new StreamRecord<>("44", 
TimestampAssigner.NO_TIMESTAMP));
                        expectedOutput.add(new StreamRecord<>("47.0", 
TimestampAssigner.NO_TIMESTAMP));
@@ -202,7 +202,7 @@ public class 
MultipleInputStreamTaskChainedSourcesCheckpointingTest {
 
                        addSourceRecords(testHarness, 0, 42, 43, 44);
                        processSingleStepUntil(testHarness, () -> 
!testHarness.getOutput().isEmpty());
-                       expectedOutput.add(new StreamRecord<>("42", 
TimestampAssigner.NO_TIMESTAMP));
+                       expectedOutput.add(new StreamRecord<>("42", 42L));
 
                        CheckpointBarrier barrier = createBarrier(testHarness);
                        Future<Boolean> checkpointFuture = 
testHarness.getStreamTask().triggerCheckpointAsync(metaData, 
barrier.getCheckpointOptions(), false);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index f3b963a..9259750 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.eventtime.TimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkGenerator;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -113,8 +112,8 @@ public class MultipleInputStreamTaskTest {
                        ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
 
                        addSourceRecords(testHarness, 1, 42, 43);
-                       expectedOutput.add(new StreamRecord<>("42", 
TimestampAssigner.NO_TIMESTAMP));
-                       expectedOutput.add(new StreamRecord<>("43", 
TimestampAssigner.NO_TIMESTAMP));
+                       expectedOutput.add(new StreamRecord<>("42", 42));
+                       expectedOutput.add(new StreamRecord<>("43", 43));
                        testHarness.processElement(new StreamRecord<>("Hello", 
initialTime + 1), 0);
                        expectedOutput.add(new StreamRecord<>("Hello", 
initialTime + 1));
                        testHarness.processElement(new StreamRecord<>(42.44d, 
initialTime + 3), 1);
@@ -419,7 +418,7 @@ public class MultipleInputStreamTaskTest {
                        testHarness.processElement(new Watermark(initialTime), 
0, 1);
 
                        addSourceRecords(testHarness, 1, initialTime);
-                       expectedOutput.add(new StreamRecord<>("" + 
(initialTime), TimestampAssigner.NO_TIMESTAMP));
+                       expectedOutput.add(new StreamRecord<>("" + 
(initialTime), initialTime));
 
                        testHarness.processElement(new Watermark(initialTime), 
1, 0);
 
@@ -443,7 +442,7 @@ public class MultipleInputStreamTaskTest {
                        testHarness.processElement(new Watermark(initialTime + 
3), 0, 1);
 
                        addSourceRecords(testHarness, 1, initialTime + 3);
-                       expectedOutput.add(new StreamRecord<>("" + (initialTime 
+ 3), TimestampAssigner.NO_TIMESTAMP));
+                       expectedOutput.add(new StreamRecord<>("" + (initialTime 
+ 3), initialTime + 3));
 
                        testHarness.processElement(new Watermark(initialTime + 
3), 1, 0);
                        testHarness.processElement(new Watermark(initialTime + 
2), 1, 1);
@@ -463,7 +462,7 @@ public class MultipleInputStreamTaskTest {
                        testHarness.processElement(new Watermark(initialTime + 
4), 0, 1);
 
                        addSourceRecords(testHarness, 1, initialTime + 4);
-                       expectedOutput.add(new StreamRecord<>("" + (initialTime 
+ 4), TimestampAssigner.NO_TIMESTAMP));
+                       expectedOutput.add(new StreamRecord<>("" + (initialTime 
+ 4), initialTime + 4));
 
                        testHarness.processElement(new Watermark(initialTime + 
4), 1, 0);
                        expectedOutput.add(new Watermark(initialTime + 4));
@@ -518,7 +517,7 @@ public class MultipleInputStreamTaskTest {
                                // we wake up the source and emit watermark
                                addSourceRecords(testHarness, 1, initialTime + 
5);
                                testHarness.processAll();
-                               expectedOutput.add(new StreamRecord<>("" + 
(initialTime + 5), TimestampAssigner.NO_TIMESTAMP));
+                               expectedOutput.add(new StreamRecord<>("" + 
(initialTime + 5), initialTime + 5));
                                expectedOutput.add(new Watermark(initialTime + 
5));
                                // the source should go back to being idle 
immediately, but AbstractStreamOperatorV2
                                // should have updated it's watermark by then.
@@ -534,7 +533,7 @@ public class MultipleInputStreamTaskTest {
 
                        // make source active once again, emit a watermark and 
go idle again.
                        addSourceRecords(testHarness, 1, initialTime + 10);
-                       expectedOutput.add(new StreamRecord<>("" + (initialTime 
+ 10), TimestampAssigner.NO_TIMESTAMP));
+                       expectedOutput.add(new StreamRecord<>("" + (initialTime 
+ 10), initialTime + 10));
                        expectedOutput.add(StreamStatus.ACTIVE);
                        expectedOutput.add(StreamStatus.IDLE);
                        testHarness.processAll();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index ca2ac10..5be45d4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.api.common.eventtime.TimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.source.Boundedness;
@@ -143,7 +142,7 @@ public class SourceOperatorStreamTaskTest {
 
                        // Build expected output to verify the results
                        Queue<Object> expectedOutput = new LinkedList<>();
-                       expectedRecords.forEach(r -> expectedOutput.offer(new 
StreamRecord<>(r, TimestampAssigner.NO_TIMESTAMP)));
+                       expectedRecords.forEach(r -> expectedOutput.offer(new 
StreamRecord<>(r, r)));
                        // Add barrier to the expected output.
                        expectedOutput.add(new CheckpointBarrier(checkpointId, 
checkpointId, checkpointOptions));
 
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
index 1fda64a..50df6a3 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
@@ -20,9 +20,8 @@ package org.apache.flink.connector.testutils.source.reader;
 
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.metrics.SourceMetricGroup;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -32,7 +31,7 @@ import java.util.List;
  */
 public class TestingReaderContext implements SourceReaderContext {
 
-       private final UnregisteredMetricsGroup metrics = new 
UnregisteredMetricsGroup();
+       private final TestingSourceMetricGroup metrics = new 
TestingSourceMetricGroup();
 
        private final Configuration config;
 
@@ -51,7 +50,7 @@ public class TestingReaderContext implements 
SourceReaderContext {
        // 
------------------------------------------------------------------------
 
        @Override
-       public MetricGroup metricGroup() {
+       public SourceMetricGroup metricGroup() {
                return metrics;
        }
 
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingSourceMetricGroup.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingSourceMetricGroup.java
new file mode 100644
index 0000000..3dc33c2
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingSourceMetricGroup.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testutils.source.reader;
+
+import org.apache.flink.api.connector.source.metrics.SourceMetricGroup;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+/**
+ * A testing implementation of {@link SourceMetricGroup}.
+ */
+public class TestingSourceMetricGroup extends UnregisteredMetricsGroup 
implements SourceMetricGroup {
+       private final Counter numRecordsInCounter = new SimpleCounter();
+       private final Counter numBytesInCounter = new SimpleCounter();
+       private final Counter numRecordsInErrorsCounter = new SimpleCounter();
+
+       @Override
+       public Counter getNumRecordsInCounter() {
+               return numRecordsInCounter;
+       }
+
+       @Override
+       public Counter getNumBytesInCounter() {
+               return numBytesInCounter;
+       }
+
+       @Override
+       public Counter getNumRecordsInErrorsCounter() {
+               return numRecordsInErrorsCounter;
+       }
+}

Reply via email to