This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3c314451bbc8ee3e00341e47a2ba58697f6d07f3 Author: Stephan Ewen <[email protected]> AuthorDate: Tue Sep 1 17:19:52 2020 +0200 [FLINK-19162][connectors] Add 'recycle()' to the RecordsWithSplitIds to support reuse of heavy objects. --- .../base/source/reader/RecordsWithSplitIds.java | 9 +++ .../base/source/reader/SourceReaderBase.java | 1 + .../base/source/reader/SplitsRecordIterator.java | 14 ++-- .../base/source/reader/SourceReaderBaseTest.java | 79 ++++++++++++++++++++++ 4 files changed, 98 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java index f616125..dc915b3 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java @@ -47,4 +47,13 @@ public interface RecordsWithSplitIds<E> { * @return the finished splits after this RecordsWithSplitIds is returned. */ Set<String> finishedSplits(); + + /** + * This method is called when all records from this batch have been emitted. + * + * <p>Overriding this method gives implementations the opportunity to recycle/reuse this object, + * which is a performance optimization that is important for cases where the record objects are + * large or otherwise heavy to allocate. + */ + default void recycle() {} } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index e01180e..4a41d49 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -149,6 +149,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt }); // Handle the finished splits. onSplitFinished(splitIter.finishedSplitIds()); + splitIter.dispose(); // Prepare the return status based on the availability of the next element. status = finishedOrAvailableLater(); } else { diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java index d7b7b76..c83bec0 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java @@ -28,8 +28,8 @@ import java.util.Set; * A class that wraps around a {@link RecordsWithSplitIds} and provide a consistent iterator. */ public class SplitsRecordIterator<E> { - private final Map<String, Collection<E>> recordsBySplits; - private final Set<String> finishedSplitIds; + + private final RecordsWithSplitIds<E> recordsWithSplitIds; private final Iterator<Map.Entry<String, Collection<E>>> splitIter; private String currentSplitId; private Iterator<E> recordsIter; @@ -40,11 +40,11 @@ public class SplitsRecordIterator<E> { * @param recordsWithSplitIds the records by splits. */ public SplitsRecordIterator(RecordsWithSplitIds<E> recordsWithSplitIds) { - this.recordsBySplits = recordsWithSplitIds.recordsBySplits(); + this.recordsWithSplitIds = recordsWithSplitIds; + Map<String, Collection<E>> recordsBySplits = recordsWithSplitIds.recordsBySplits(); // Remove empty splits; recordsBySplits.entrySet().removeIf(e -> e.getValue().isEmpty()); this.splitIter = recordsBySplits.entrySet().iterator(); - this.finishedSplitIds = recordsWithSplitIds.finishedSplits(); } /** @@ -91,6 +91,10 @@ public class SplitsRecordIterator<E> { * @return a set of finished split Ids. */ public Set<String> finishedSplitIds() { - return finishedSplitIds; + return recordsWithSplitIds.finishedSplits(); + } + + public void dispose() { + recordsWithSplitIds.recycle(); } } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index a332efe..0ec4297 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -19,10 +19,17 @@ package org.apache.flink.connector.base.source.reader; import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader; import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader; +import org.apache.flink.connector.base.source.reader.mocks.PassThroughRecordEmitter; +import org.apache.flink.connector.base.source.reader.mocks.TestingReaderContext; +import org.apache.flink.connector.base.source.reader.mocks.TestingReaderOutput; +import org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit; +import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; @@ -33,10 +40,14 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Queue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * A unit test class for {@link SourceReaderBase}. */ @@ -89,6 +100,29 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> } } + @Test + public void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception { + final TestingRecordsWithSplitIds<String> records = new TestingRecordsWithSplitIds<>("test-split", "value1", "value2"); + final SourceReader<?, ?> reader = createReaderAndAwaitAvailable("test-split", records); + + reader.pollNext(new TestingReaderOutput<>()); + + assertFalse(records.isRecycled()); + } + + @Test + public void testRecordsWithSplitsRecycledWhenEmpty() throws Exception { + final TestingRecordsWithSplitIds<String> records = new TestingRecordsWithSplitIds<>("test-split", "value1", "value2"); + final SourceReader<?, ?> reader = createReaderAndAwaitAvailable("test-split", records); + + // poll thrice: twice to get all records, one more to trigger recycle and moving to the next split + reader.pollNext(new TestingReaderOutput<>()); + reader.pollNext(new TestingReaderOutput<>()); + reader.pollNext(new TestingReaderOutput<>()); + + assertTrue(records.isRecycled()); + } + // ---------------- helper methods ----------------- @Override @@ -140,4 +174,49 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L); return config; } + + // ------------------------------------------------------------------------ + // Testing Setup Helpers + // ------------------------------------------------------------------------ + + private static <E> SourceReader<E, ?> createReaderAndAwaitAvailable( + final String splitId, + final RecordsWithSplitIds<E> records) throws Exception { + + final FutureNotifier futureNotifier = new FutureNotifier(); + final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue = + new FutureCompletingBlockingQueue<>(futureNotifier); + + final SourceReader<E, TestingSourceSplit> reader = new SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, TestingSourceSplit>( + futureNotifier, + elementsQueue, + () -> new TestingSplitReader<E, TestingSourceSplit>(records), + new PassThroughRecordEmitter<E, TestingSourceSplit>(), + new Configuration(), + new TestingReaderContext()) { + + @Override + protected void onSplitFinished(Collection<String> finishedSplitIds) { + } + + @Override + protected TestingSourceSplit initializedState(TestingSourceSplit split) { + return split; + } + + @Override + protected TestingSourceSplit toSplitType(String splitId, TestingSourceSplit splitState) { + return splitState; + } + }; + + reader.start(); + + final List<TestingSourceSplit> splits = Collections.singletonList(new TestingSourceSplit(splitId)); + reader.addSplits(splits); + + reader.isAvailable().get(); + + return reader; + } }
