This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3c314451bbc8ee3e00341e47a2ba58697f6d07f3
Author: Stephan Ewen <[email protected]>
AuthorDate: Tue Sep 1 17:19:52 2020 +0200

    [FLINK-19162][connectors] Add 'recycle()' to the RecordsWithSplitIds to 
support reuse of heavy objects.
---
 .../base/source/reader/RecordsWithSplitIds.java    |  9 +++
 .../base/source/reader/SourceReaderBase.java       |  1 +
 .../base/source/reader/SplitsRecordIterator.java   | 14 ++--
 .../base/source/reader/SourceReaderBaseTest.java   | 79 ++++++++++++++++++++++
 4 files changed, 98 insertions(+), 5 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
index f616125..dc915b3 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
@@ -47,4 +47,13 @@ public interface RecordsWithSplitIds<E> {
         * @return the finished splits after this RecordsWithSplitIds is 
returned.
         */
        Set<String> finishedSplits();
+
+       /**
+        * This method is called when all records from this batch have been 
emitted.
+        *
+        * <p>Overriding this method gives implementations the opportunity to 
recycle/reuse this object,
+        * which is a performance optimization that is important for cases 
where the record objects are
+        * large or otherwise heavy to allocate.
+        */
+       default void recycle() {}
 }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index e01180e..4a41d49 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -149,6 +149,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
                                });
                                // Handle the finished splits.
                                onSplitFinished(splitIter.finishedSplitIds());
+                               splitIter.dispose();
                                // Prepare the return status based on the 
availability of the next element.
                                status = finishedOrAvailableLater();
                        } else {
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
index d7b7b76..c83bec0 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
@@ -28,8 +28,8 @@ import java.util.Set;
  * A class that wraps around a {@link RecordsWithSplitIds} and provide a 
consistent iterator.
  */
 public class SplitsRecordIterator<E> {
-       private final Map<String, Collection<E>> recordsBySplits;
-       private final Set<String> finishedSplitIds;
+
+       private final RecordsWithSplitIds<E> recordsWithSplitIds;
        private final Iterator<Map.Entry<String, Collection<E>>> splitIter;
        private String currentSplitId;
        private Iterator<E> recordsIter;
@@ -40,11 +40,11 @@ public class SplitsRecordIterator<E> {
         * @param recordsWithSplitIds the records by splits.
         */
        public SplitsRecordIterator(RecordsWithSplitIds<E> recordsWithSplitIds) 
{
-               this.recordsBySplits = recordsWithSplitIds.recordsBySplits();
+               this.recordsWithSplitIds = recordsWithSplitIds;
+               Map<String, Collection<E>> recordsBySplits = 
recordsWithSplitIds.recordsBySplits();
                // Remove empty splits;
                recordsBySplits.entrySet().removeIf(e -> 
e.getValue().isEmpty());
                this.splitIter = recordsBySplits.entrySet().iterator();
-               this.finishedSplitIds = recordsWithSplitIds.finishedSplits();
        }
 
        /**
@@ -91,6 +91,10 @@ public class SplitsRecordIterator<E> {
         * @return a set of finished split Ids.
         */
        public Set<String> finishedSplitIds() {
-               return finishedSplitIds;
+               return recordsWithSplitIds.finishedSplits();
+       }
+
+       public void dispose() {
+               recordsWithSplitIds.recycle();
        }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index a332efe..0ec4297 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -19,10 +19,17 @@
 package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
 import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
+import 
org.apache.flink.connector.base.source.reader.mocks.PassThroughRecordEmitter;
+import 
org.apache.flink.connector.base.source.reader.mocks.TestingReaderContext;
+import org.apache.flink.connector.base.source.reader.mocks.TestingReaderOutput;
+import 
org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
@@ -33,10 +40,14 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * A unit test class for {@link SourceReaderBase}.
  */
@@ -89,6 +100,29 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                }
        }
 
+       @Test
+       public void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws 
Exception {
+               final TestingRecordsWithSplitIds<String> records = new 
TestingRecordsWithSplitIds<>("test-split", "value1", "value2");
+               final SourceReader<?, ?> reader = 
createReaderAndAwaitAvailable("test-split", records);
+
+               reader.pollNext(new TestingReaderOutput<>());
+
+               assertFalse(records.isRecycled());
+       }
+
+       @Test
+       public void testRecordsWithSplitsRecycledWhenEmpty() throws Exception {
+               final TestingRecordsWithSplitIds<String> records = new 
TestingRecordsWithSplitIds<>("test-split", "value1", "value2");
+               final SourceReader<?, ?> reader = 
createReaderAndAwaitAvailable("test-split", records);
+
+               // poll thrice: twice to get all records, one more to trigger 
recycle and moving to the next split
+               reader.pollNext(new TestingReaderOutput<>());
+               reader.pollNext(new TestingReaderOutput<>());
+               reader.pollNext(new TestingReaderOutput<>());
+
+               assertTrue(records.isRecycled());
+       }
+
        // ---------------- helper methods -----------------
 
        @Override
@@ -140,4 +174,49 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 
30000L);
                return config;
        }
+
+       // 
------------------------------------------------------------------------
+       //  Testing Setup Helpers
+       // 
------------------------------------------------------------------------
+
+       private static <E> SourceReader<E, ?> createReaderAndAwaitAvailable(
+               final String splitId,
+               final RecordsWithSplitIds<E> records) throws Exception {
+
+               final FutureNotifier futureNotifier = new FutureNotifier();
+               final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue =
+                       new FutureCompletingBlockingQueue<>(futureNotifier);
+
+               final SourceReader<E, TestingSourceSplit> reader = new 
SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, 
TestingSourceSplit>(
+                       futureNotifier,
+                       elementsQueue,
+                       () -> new TestingSplitReader<E, 
TestingSourceSplit>(records),
+                       new PassThroughRecordEmitter<E, TestingSourceSplit>(),
+                       new Configuration(),
+                       new TestingReaderContext()) {
+
+                       @Override
+                       protected void onSplitFinished(Collection<String> 
finishedSplitIds) {
+                       }
+
+                       @Override
+                       protected TestingSourceSplit 
initializedState(TestingSourceSplit split) {
+                               return split;
+                       }
+
+                       @Override
+                       protected TestingSourceSplit toSplitType(String 
splitId, TestingSourceSplit splitState) {
+                               return splitState;
+                       }
+               };
+
+               reader.start();
+
+               final List<TestingSourceSplit> splits = 
Collections.singletonList(new TestingSourceSplit(splitId));
+               reader.addSplits(splits);
+
+               reader.isAvailable().get();
+
+               return reader;
+       }
 }

Reply via email to