This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch FLINK-19743
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4cb77755791ae9885e969f595011bb8f9d8a6d88
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Fri Nov 13 11:28:21 2020 +0800

    [FLINK-19743][connectors/common] Report numRecordsIn and 
numRecordsInPerSecond in SourceReaderBase.
---
 .../base/source/reader/SourceReaderBase.java       |  8 +++-
 .../base/source/reader/SourceReaderBaseTest.java   | 48 ++++++++++++++++++----
 2 files changed, 47 insertions(+), 9 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 0a7a742..4355fb3 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.Counter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,7 +78,10 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
        protected final Configuration config;
 
        /** The context of this source reader. */
-       protected SourceReaderContext context;
+       protected final SourceReaderContext context;
+
+       /** A counter that records the number of records consumed by this 
source reader. */
+       private final Counter numRecordsInCounter;
 
        /** The latest fetched batch of records-by-split from the split reader. 
*/
        @Nullable private RecordsWithSplitIds<E> currentFetch;
@@ -100,6 +104,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
                this.options = new SourceReaderOptions(config);
                this.config = config;
                this.context = context;
+               this.numRecordsInCounter = 
context.metricGroup().getNumRecordsInCounter();
                this.noMoreSplitsAssignment = false;
        }
 
@@ -123,6 +128,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
                        final E record = 
recordsWithSplitId.nextRecordFromSplit();
                        if (record != null) {
                                // emit the record.
+                               numRecordsInCounter.inc();
                                recordEmitter.emitRecord(record, 
currentSplitOutput, currentSplitContext.state);
                                LOG.trace("Emitted record: {}", record);
 
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 5d5bc20..9ab014d 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -19,7 +19,9 @@
 package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.configuration.Configuration;
@@ -90,7 +92,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                                public void close() {}
                        },
                        getConfig(),
-                       null)) {
+                       new TestingReaderContext())) {
                        ValidatingSourceOutput output = new 
ValidatingSourceOutput();
                        reader.addSplits(Collections.singletonList(getSplit(0,
                                NUM_RECORDS_PER_SPLIT,
@@ -130,6 +132,25 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
        }
 
        @Test
+       public void testMetrics() throws Exception {
+               final TestingRecordsWithSplitIds<String> records = new 
TestingRecordsWithSplitIds<>("test-split", "value1", "value2");
+               final SourceReaderContext context = new TestingReaderContext();
+               final SourceReader<String, ?> reader = 
createReaderAndAwaitAvailable(
+                       "test-split",
+                       records,
+                       new PassThroughRecordEmitter<>(),
+                       context);
+
+               ReaderOutput<String> readerOutput = new TestingReaderOutput<>();
+               assertEquals(0, 
context.metricGroup().getNumRecordsInCounter().getCount());
+               reader.pollNext(readerOutput);
+               assertEquals(1, 
context.metricGroup().getNumRecordsInCounter().getCount());
+               reader.pollNext(readerOutput);
+               assertEquals(InputStatus.NOTHING_AVAILABLE, 
reader.pollNext(readerOutput));
+               assertEquals(2, 
context.metricGroup().getNumRecordsInCounter().getCount());
+       }
+
+       @Test
        public void testMultipleSplitsWithDifferentFinishingMoments() throws 
Exception {
                FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
                        new FutureCompletingBlockingQueue<>();
@@ -142,7 +163,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                        elementsQueue,
                        () -> mockSplitReader,
                        getConfig(),
-                       null);
+                       new TestingReaderContext());
 
                reader.start();
 
@@ -176,7 +197,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                        elementsQueue,
                        () -> mockSplitReader,
                        getConfig(),
-                       null);
+                       new TestingReaderContext());
 
                reader.start();
 
@@ -211,7 +232,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                        elementsQueue,
                        splitFetcherManager,
                        getConfig(),
-                       null);
+                       new TestingReaderContext());
 
                // Create and add a split that only contains one record
                final MockSourceSplit split = new MockSourceSplit(0, 0, 1);
@@ -236,7 +257,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                        elementsQueue,
                        () -> mockSplitReader,
                        getConfig(),
-                       null);
+                       new TestingReaderContext());
        }
 
        @Override
@@ -281,16 +302,27 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
        private static <E> SourceReader<E, ?> createReaderAndAwaitAvailable(
                final String splitId,
                final RecordsWithSplitIds<E> records) throws Exception {
+               return createReaderAndAwaitAvailable(
+                       splitId, records, new PassThroughRecordEmitter<>(), new 
TestingReaderContext());
+       }
+
+       private static <E> SourceReader<E, ?> createReaderAndAwaitAvailable(
+               final String splitId,
+               final RecordsWithSplitIds<E> records,
+               final RecordEmitter<E, E, TestingSourceSplit> recordEmitter,
+               final SourceReaderContext context) throws Exception {
 
                final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue =
                        new FutureCompletingBlockingQueue<>();
 
                final SourceReader<E, TestingSourceSplit> reader = new 
SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, 
TestingSourceSplit>(
                        elementsQueue,
-                       () -> new TestingSplitReader<>(records),
-                       new PassThroughRecordEmitter<>(),
+                       new SingleThreadFetcherManager<>(
+                               elementsQueue,
+                               () -> new TestingSplitReader<>(records)),
+                       recordEmitter,
                        new Configuration(),
-                       new TestingReaderContext()) {
+                       context) {
 
                        @Override
                        public void notifyCheckpointComplete(long checkpointId) 
{}

Reply via email to