This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch FLINK-19743 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4cb77755791ae9885e969f595011bb8f9d8a6d88 Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Fri Nov 13 11:28:21 2020 +0800 [FLINK-19743][connectors/common] Report numRecordsIn and numRecordsInPerSecond in SourceReaderBase. --- .../base/source/reader/SourceReaderBase.java | 8 +++- .../base/source/reader/SourceReaderBaseTest.java | 48 ++++++++++++++++++---- 2 files changed, 47 insertions(+), 9 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 0a7a742..4355fb3 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -29,6 +29,7 @@ import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.core.io.InputStatus; +import org.apache.flink.metrics.Counter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +78,10 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt protected final Configuration config; /** The context of this source reader. */ - protected SourceReaderContext context; + protected final SourceReaderContext context; + + /** A counter that records the number of records consumed by this source reader. */ + private final Counter numRecordsInCounter; /** The latest fetched batch of records-by-split from the split reader. */ @Nullable private RecordsWithSplitIds<E> currentFetch; @@ -100,6 +104,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt this.options = new SourceReaderOptions(config); this.config = config; this.context = context; + this.numRecordsInCounter = context.metricGroup().getNumRecordsInCounter(); this.noMoreSplitsAssignment = false; } @@ -123,6 +128,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt final E record = recordsWithSplitId.nextRecordFromSplit(); if (record != null) { // emit the record. + numRecordsInCounter.inc(); recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state); LOG.trace("Emitted record: {}", record); diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index 5d5bc20..9ab014d 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -19,7 +19,9 @@ package org.apache.flink.connector.base.source.reader; import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.configuration.Configuration; @@ -90,7 +92,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> public void close() {} }, getConfig(), - null)) { + new TestingReaderContext())) { ValidatingSourceOutput output = new ValidatingSourceOutput(); reader.addSplits(Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, @@ -130,6 +132,25 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> } @Test + public void testMetrics() throws Exception { + final TestingRecordsWithSplitIds<String> records = new TestingRecordsWithSplitIds<>("test-split", "value1", "value2"); + final SourceReaderContext context = new TestingReaderContext(); + final SourceReader<String, ?> reader = createReaderAndAwaitAvailable( + "test-split", + records, + new PassThroughRecordEmitter<>(), + context); + + ReaderOutput<String> readerOutput = new TestingReaderOutput<>(); + assertEquals(0, context.metricGroup().getNumRecordsInCounter().getCount()); + reader.pollNext(readerOutput); + assertEquals(1, context.metricGroup().getNumRecordsInCounter().getCount()); + reader.pollNext(readerOutput); + assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(readerOutput)); + assertEquals(2, context.metricGroup().getNumRecordsInCounter().getCount()); + } + + @Test public void testMultipleSplitsWithDifferentFinishingMoments() throws Exception { FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = new FutureCompletingBlockingQueue<>(); @@ -142,7 +163,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> elementsQueue, () -> mockSplitReader, getConfig(), - null); + new TestingReaderContext()); reader.start(); @@ -176,7 +197,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> elementsQueue, () -> mockSplitReader, getConfig(), - null); + new TestingReaderContext()); reader.start(); @@ -211,7 +232,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> elementsQueue, splitFetcherManager, getConfig(), - null); + new TestingReaderContext()); // Create and add a split that only contains one record final MockSourceSplit split = new MockSourceSplit(0, 0, 1); @@ -236,7 +257,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> elementsQueue, () -> mockSplitReader, getConfig(), - null); + new TestingReaderContext()); } @Override @@ -281,16 +302,27 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> private static <E> SourceReader<E, ?> createReaderAndAwaitAvailable( final String splitId, final RecordsWithSplitIds<E> records) throws Exception { + return createReaderAndAwaitAvailable( + splitId, records, new PassThroughRecordEmitter<>(), new TestingReaderContext()); + } + + private static <E> SourceReader<E, ?> createReaderAndAwaitAvailable( + final String splitId, + final RecordsWithSplitIds<E> records, + final RecordEmitter<E, E, TestingSourceSplit> recordEmitter, + final SourceReaderContext context) throws Exception { final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue = new FutureCompletingBlockingQueue<>(); final SourceReader<E, TestingSourceSplit> reader = new SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, TestingSourceSplit>( elementsQueue, - () -> new TestingSplitReader<>(records), - new PassThroughRecordEmitter<>(), + new SingleThreadFetcherManager<>( + elementsQueue, + () -> new TestingSplitReader<>(records)), + recordEmitter, new Configuration(), - new TestingReaderContext()) { + context) { @Override public void notifyCheckpointComplete(long checkpointId) {}
