This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b8733d7d6f21f5c54716e594e9e7f9c950c06be5
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Sep 7 00:20:38 2020 +0200

    [FLINK-18680][connectors] Make connector base RecordsWithSplitIds more 
lightweight.
    
    This turns the RecordsWithSplitIds structure from a holder of materialized 
collections to
    a simple iterator-like structure to allow for lazy materialization and 
object reuse.
---
 .../base/source/reader/RecordsBySplits.java        | 179 +++++++++++++--------
 .../base/source/reader/RecordsWithSplitIds.java    |  20 +--
 .../base/source/reader/SourceReaderBase.java       | 117 +++++++++-----
 .../base/source/reader/SplitsRecordIterator.java   | 100 ------------
 .../source/reader/fetcher/SplitFetcherTest.java    |  10 +-
 .../base/source/reader/mocks/MockSplitReader.java  |   5 +-
 .../reader/mocks/TestingRecordsWithSplitIds.java   |  32 +---
 7 files changed, 212 insertions(+), 251 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
index 77cb594..0b15432 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
@@ -20,83 +20,59 @@ package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.connector.source.SourceSplit;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * An implementation of RecordsWithSplitIds to host all the records by splits.
  */
 public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
-       private Map<String, Collection<E>> recordsBySplits = new 
LinkedHashMap<>();
-       private Set<String> finishedSplits = new HashSet<>();
 
-       /**
-        * Add the record from the given split ID.
-        *
-        * @param splitId the split ID the record was from.
-        * @param record the record to add.
-        */
-       public void add(String splitId, E record) {
-               recordsBySplits.computeIfAbsent(splitId, sid -> new 
ArrayList<>()).add(record);
-       }
+       private final Set<String> finishedSplits;
 
-       /**
-        * Add the record from the given source split.
-        *
-        * @param split the source split the record was from.
-        * @param record the record to add.
-        */
-       public void add(SourceSplit split, E record) {
-               add(split.splitId(), record);
-       }
+       private final Iterator<Map.Entry<String, Collection<E>>> splitsIterator;
 
-       /**
-        * Add multiple records from the given split ID.
-        *
-        * @param splitId the split ID given the records were from.
-        * @param records the records to add.
-        */
-       public void addAll(String splitId, Collection<E> records) {
-               this.recordsBySplits.compute(splitId, (id, r) -> {
-                       if (r == null) {
-                               r = records;
-                       } else {
-                               r.addAll(records);
-                       }
-                       return r;
-               });
-       }
+       @Nullable
+       private Iterator<E> recordsInCurrentSplit;
 
-       /**
-        * Add multiple records from the given source split.
-        *
-        * @param split the source split the records were from.
-        * @param records the records to add.
-        */
-       public void addAll(SourceSplit split, Collection<E> records) {
-               addAll(split.splitId(), records);
+       public RecordsBySplits(
+                       final Map<String, Collection<E>> recordsBySplit,
+                       final Set<String> finishedSplits) {
+
+               this.splitsIterator = checkNotNull(recordsBySplit, 
"recordsBySplit").entrySet().iterator();
+               this.finishedSplits = checkNotNull(finishedSplits, 
"finishedSplits");
        }
 
-       /**
-        * Mark the split with the given ID as finished.
-        *
-        * @param splitId the ID of the finished split.
-        */
-       public void addFinishedSplit(String splitId) {
-               finishedSplits.add(splitId);
+       @Nullable
+       @Override
+       public String nextSplit() {
+               if (splitsIterator.hasNext()) {
+                       final Map.Entry<String, Collection<E>> next = 
splitsIterator.next();
+                       recordsInCurrentSplit = next.getValue().iterator();
+                       return next.getKey();
+               } else {
+                       return null;
+               }
        }
 
-       /**
-        * Mark multiple splits with the given IDs as finished.
-        *
-        * @param splitIds the IDs of the finished splits.
-        */
-       public void addFinishedSplits(Collection<String> splitIds) {
-               finishedSplits.addAll(splitIds);
+       @Nullable
+       @Override
+       public E nextRecordFromSplit() {
+               if (recordsInCurrentSplit == null) {
+                       throw new IllegalStateException();
+               }
+
+               return recordsInCurrentSplit.hasNext() ? 
recordsInCurrentSplit.next() : null;
        }
 
        @Override
@@ -104,13 +80,86 @@ public class RecordsBySplits<E> implements 
RecordsWithSplitIds<E> {
                return finishedSplits;
        }
 
-       @Override
-       public Collection<String> splitIds() {
-               return recordsBySplits.keySet();
-       }
+       // 
------------------------------------------------------------------------
 
-       @Override
-       public Map<String, Collection<E>> recordsBySplits() {
-               return recordsBySplits;
+       /**
+        * A utility builder to collect records in individual calls, rather 
than put a finished collection
+        * in the {@link RecordsBySplits#RecordsBySplits(Map, Set)} constructor.
+        */
+       public static class Builder<E> {
+
+               private final Map<String, Collection<E>> recordsBySplits = new 
LinkedHashMap<>();
+               private final Set<String> finishedSplits = new HashSet<>(2);
+
+               /**
+                * Add the record from the given split ID.
+                *
+                * @param splitId the split ID the record was from.
+                * @param record the record to add.
+                */
+               public void add(String splitId, E record) {
+                       recordsBySplits.computeIfAbsent(splitId, sid -> new 
ArrayList<>()).add(record);
+               }
+
+               /**
+                * Add the record from the given source split.
+                *
+                * @param split the source split the record was from.
+                * @param record the record to add.
+                */
+               public void add(SourceSplit split, E record) {
+                       add(split.splitId(), record);
+               }
+
+               /**
+                * Add multiple records from the given split ID.
+                *
+                * @param splitId the split ID given the records were from.
+                * @param records the records to add.
+                */
+               public void addAll(String splitId, Collection<E> records) {
+                       this.recordsBySplits.compute(splitId, (id, r) -> {
+                               if (r == null) {
+                                       r = records;
+                               } else {
+                                       r.addAll(records);
+                               }
+                               return r;
+                       });
+               }
+
+               /**
+                * Add multiple records from the given source split.
+                *
+                * @param split the source split the records were from.
+                * @param records the records to add.
+                */
+               public void addAll(SourceSplit split, Collection<E> records) {
+                       addAll(split.splitId(), records);
+               }
+
+               /**
+                * Mark the split with the given ID as finished.
+                *
+                * @param splitId the ID of the finished split.
+                */
+               public void addFinishedSplit(String splitId) {
+                       finishedSplits.add(splitId);
+               }
+
+               /**
+                * Mark multiple splits with the given IDs as finished.
+                *
+                * @param splitIds the IDs of the finished splits.
+                */
+               public void addFinishedSplits(Collection<String> splitIds) {
+                       finishedSplits.addAll(splitIds);
+               }
+
+               public RecordsBySplits<E> build() {
+                       return new RecordsBySplits<>(
+                               recordsBySplits.isEmpty() ? 
Collections.emptyMap() : recordsBySplits,
+                               finishedSplits.isEmpty() ? 
Collections.emptySet() : finishedSplits);
+               }
        }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
index dc915b3..0c8fd07 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.base.source.reader;
 
-import java.util.Collection;
-import java.util.Map;
+import javax.annotation.Nullable;
+
 import java.util.Set;
 
 /**
@@ -28,18 +28,18 @@ import java.util.Set;
 public interface RecordsWithSplitIds<E> {
 
        /**
-        * Get all the split ids.
-        *
-        * @return a collection of split ids.
+        * Moves to the next split. This method is also called initially to 
move to the
+        * first split. Returns null, if no splits are left.
         */
-       Collection<String> splitIds();
+       @Nullable
+       String nextSplit();
 
        /**
-        * Get all the records by Splits.
-        *
-        * @return a mapping from split ids to the records.
+        * Gets the next record from the current split. Returns null if no more 
records are left
+        * in this split.
         */
-       Map<String, Collection<E>> recordsBySplits();
+       @Nullable
+       E nextRecordFromSplit();
 
        /**
         * Get the finished splits.
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 4a41d49..02b7a7c 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -35,14 +35,19 @@ import org.apache.flink.core.io.InputStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * An abstract implementation of {@link SourceReader} which provides some 
sychronization between
  * the mail box main thread and the SourceReader internal threads. This class 
allows user to
@@ -81,8 +86,10 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
        /** The context of this source reader. */
        protected SourceReaderContext context;
 
-       /** The last element to ensure it is fully handled. */
-       private SplitsRecordIterator<E> splitIter;
+       /** The latest fetched batch of records-by-split from the split reader. 
*/
+       @Nullable private RecordsWithSplitIds<E> currentFetch;
+       @Nullable private SplitContext<T, SplitStateT> currentSplitContext;
+       @Nullable private SourceOutput<T> currentSplitOutput;
 
        /** Indicating whether the SourceReader will be assigned more splits or 
not.*/
        private boolean noMoreSplitsAssignment;
@@ -99,7 +106,6 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
                this.splitFetcherManager = splitFetcherManager;
                this.recordEmitter = recordEmitter;
                this.splitStates = new HashMap<>();
-               this.splitIter = null;
                this.options = new SourceReaderOptions(config);
                this.config = config;
                this.context = context;
@@ -107,60 +113,87 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
        }
 
        @Override
-       public void start() {
-
-       }
+       public void start() {}
 
        @Override
        public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
                splitFetcherManager.checkErrors();
-               // poll from the queue if the last element was successfully 
handled. Otherwise
-               // just pass the last element again.
-               RecordsWithSplitIds<E> recordsWithSplitId = null;
-               boolean newFetch = splitIter == null || !splitIter.hasNext();
-               if (newFetch) {
-                       recordsWithSplitId = elementsQueue.poll();
+
+               // make sure we have a fetch we are working on, or move to the 
next
+               final RecordsWithSplitIds<E> recordsWithSplitId = 
getCurrentOrNewFetch(output);
+               if (recordsWithSplitId == null) {
+                       return trace(finishedOrAvailableLater());
                }
 
-               InputStatus status;
-               if (newFetch && recordsWithSplitId == null) {
-                       // No element available, set to available later if 
needed.
-                       status = finishedOrAvailableLater();
-               } else {
-                       // Update the record iterator if it is a new fetch.
-                       if (newFetch) {
-                               splitIter = new 
SplitsRecordIterator<>(recordsWithSplitId);
-                       }
+               // we need to loop here, because we may have to go across splits
+               while (true) {
                        // Process one record.
-                       if (splitIter.hasNext()) {
+                       final E record = 
recordsWithSplitId.nextRecordFromSplit();
+                       if (record != null) {
                                // emit the record.
-                               final E record = splitIter.next();
-                               final SplitContext<T, SplitStateT> splitContext 
= splitStates.get(splitIter.currentSplitId());
-                               final SourceOutput<T> splitOutput = 
splitContext.getOrCreateSplitOutput(output);
-                               recordEmitter.emitRecord(record, splitOutput, 
splitContext.state);
+                               recordEmitter.emitRecord(record, 
currentSplitOutput, currentSplitContext.state);
                                LOG.trace("Emitted record: {}", record);
+                               return trace(InputStatus.MORE_AVAILABLE);
                        }
-                       // Do some cleanup if the all the records in the 
current splitIter have been processed.
-                       if (!splitIter.hasNext()) {
-                               // First remove the state of the split.
-                               splitIter.finishedSplitIds().forEach((id) -> {
-                                       splitStates.remove(id);
-                                       output.releaseOutputForSplit(id);
-                               });
-                               // Handle the finished splits.
-                               onSplitFinished(splitIter.finishedSplitIds());
-                               splitIter.dispose();
-                               // Prepare the return status based on the 
availability of the next element.
-                               status = finishedOrAvailableLater();
-                       } else {
-                               // There are more records from the current 
splitIter.
-                               status = InputStatus.MORE_AVAILABLE;
+                       else if (!moveToNextSplit(recordsWithSplitId, output)) {
+                               return trace(finishedOrAvailableLater());
                        }
+                       // else fall through the loop
                }
+       }
+
+       private InputStatus trace(InputStatus status) {
                LOG.trace("Source reader status: {}", status);
                return status;
        }
 
+       @Nullable
+       private RecordsWithSplitIds<E> getCurrentOrNewFetch(final 
ReaderOutput<T> output) {
+               RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
+               if (recordsWithSplitId != null) {
+                       return recordsWithSplitId;
+               }
+
+               recordsWithSplitId = elementsQueue.poll();
+               if (recordsWithSplitId == null || 
!moveToNextSplit(recordsWithSplitId, output)) {
+                       // No element available, set to available later if 
needed.
+                       return null;
+               }
+
+               currentFetch = recordsWithSplitId;
+               return recordsWithSplitId;
+       }
+
+       private void finishCurrentFetch(final RecordsWithSplitIds<E> fetch, 
final ReaderOutput<T> output) {
+               currentFetch = null;
+               currentSplitContext = null;
+               currentSplitOutput = null;
+
+               final Set<String> finishedSplits = fetch.finishedSplits();
+               if (!finishedSplits.isEmpty()) {
+                       for (String finishedSplitId : finishedSplits) {
+                               splitStates.remove(finishedSplitId);
+                               output.releaseOutputForSplit(finishedSplitId);
+                       }
+                       onSplitFinished(finishedSplits);
+               }
+
+               fetch.recycle();
+       }
+
+       private boolean moveToNextSplit(RecordsWithSplitIds<E> 
recordsWithSplitIds, ReaderOutput<T> output) {
+               final String nextSplitId = recordsWithSplitIds.nextSplit();
+               if (nextSplitId == null) {
+                       finishCurrentFetch(recordsWithSplitIds, output);
+                       return false;
+               }
+
+               currentSplitContext = splitStates.get(nextSplitId);
+               checkState(currentSplitContext != null, "Have records for a 
split that was not registered");
+               currentSplitOutput = 
currentSplitContext.getOrCreateSplitOutput(output);
+               return true;
+       }
+
        @Override
        public CompletableFuture<Void> isAvailable() {
                // The order matters here. We first get the future. After this 
point, if the queue
@@ -235,7 +268,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
 
        private InputStatus finishedOrAvailableLater() {
                boolean allFetchersHaveShutdown = 
splitFetcherManager.maybeShutdownFinishedFetchers();
-               boolean allElementsEmitted = elementsQueue.isEmpty() && 
(splitIter == null || !splitIter.hasNext());
+               boolean allElementsEmitted = elementsQueue.isEmpty();
                if (noMoreSplitsAssignment && allFetchersHaveShutdown && 
allElementsEmitted) {
                        return InputStatus.END_OF_INPUT;
                } else {
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
deleted file mode 100644
index c83bec0..0000000
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.source.reader;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-/**
- * A class that wraps around a {@link RecordsWithSplitIds} and provide a 
consistent iterator.
- */
-public class SplitsRecordIterator<E> {
-
-       private final RecordsWithSplitIds<E> recordsWithSplitIds;
-       private final Iterator<Map.Entry<String, Collection<E>>> splitIter;
-       private String currentSplitId;
-       private Iterator<E> recordsIter;
-
-       /**
-        * Construct a cross-splits iterator for the records.
-        *
-        * @param recordsWithSplitIds the records by splits.
-        */
-       public SplitsRecordIterator(RecordsWithSplitIds<E> recordsWithSplitIds) 
{
-               this.recordsWithSplitIds = recordsWithSplitIds;
-               Map<String, Collection<E>> recordsBySplits = 
recordsWithSplitIds.recordsBySplits();
-               // Remove empty splits;
-               recordsBySplits.entrySet().removeIf(e -> 
e.getValue().isEmpty());
-               this.splitIter = recordsBySplits.entrySet().iterator();
-       }
-
-       /**
-        * Whether their are more records available.
-        *
-        * @return true if there are more records, false otherwise.
-        */
-       public boolean hasNext() {
-               if (recordsIter == null || !recordsIter.hasNext()) {
-                       if (splitIter.hasNext()) {
-                               Map.Entry<String, Collection<E>> entry = 
splitIter.next();
-                               currentSplitId = entry.getKey();
-                               recordsIter = entry.getValue().iterator();
-                       } else {
-                               return false;
-                       }
-               }
-               return recordsIter.hasNext() || splitIter.hasNext();
-       }
-
-       /**
-        * Get the next record.
-        * @return the next record.
-        */
-       public E next() {
-               if (!hasNext()) {
-                       throw new NoSuchElementException();
-               }
-               return recordsIter.next();
-       }
-
-       /**
-        * Get the split id of the last returned record.
-        *
-        * @return the split id of the last returned record.
-        */
-       public String currentSplitId() {
-               return currentSplitId;
-       }
-
-       /**
-        * The split Ids that are finished after all the records in this 
iterator are emitted.
-        *
-        * @return a set of finished split Ids.
-        */
-       public Set<String> finishedSplitIds() {
-               return recordsWithSplitIds.finishedSplits();
-       }
-
-       public void dispose() {
-               recordsWithSplitIds.recycle();
-       }
-}
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 953d7aa..e9c2ad2 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -95,9 +95,13 @@ public class SplitFetcherTest {
                        interrupter.start();
 
                        while (recordsRead.size() < NUM_SPLITS * 
NUM_RECORDS_PER_SPLIT) {
-                               
elementQueue.take().recordsBySplits().values().forEach(records ->
-                                               // Ensure there is no duplicate 
records.
-                                               records.forEach(arr -> 
assertTrue(recordsRead.add(arr[0]))));
+                               final RecordsWithSplitIds<int[]> nextBatch = 
elementQueue.take();
+                               while (nextBatch.nextSplit() != null) {
+                                       int[] arr;
+                                       while ((arr = 
nextBatch.nextRecordFromSplit()) != null) {
+                                               
assertTrue(recordsRead.add(arr[0]));
+                                       }
+                               }
                        }
 
                        assertEquals(NUM_TOTAL_RECORDS, recordsRead.size());
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
index 3c6d8df..00d4d71 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
@@ -81,7 +81,8 @@ public class MockSplitReader implements SplitReader<int[], 
MockSourceSplit> {
        }
 
        private RecordsBySplits<int[]> getRecords() {
-               RecordsBySplits<int[]> records = new RecordsBySplits<>();
+               final RecordsBySplits.Builder<int[]> records = new 
RecordsBySplits.Builder<>();
+
                try {
                        for (Map.Entry<String, MockSourceSplit> entry : 
splits.entrySet()) {
                                MockSourceSplit split = entry.getValue();
@@ -102,6 +103,6 @@ public class MockSplitReader implements SplitReader<int[], 
MockSourceSplit> {
                                throw new RuntimeException("Caught unexpected 
interrupted exception.");
                        }
                }
-               return records;
+               return records.build();
        }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
index 2a8377a..3aa49ac 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
@@ -18,48 +18,22 @@
 
 package org.apache.flink.connector.base.source.reader.mocks;
 
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A mock implementation of {@link RecordsWithSplitIds} that returns a given 
set of records.
  */
-public class TestingRecordsWithSplitIds<E> implements RecordsWithSplitIds<E> {
-
-       private final Map<String, Collection<E>> records;
-
-       private final String splitId;
+public class TestingRecordsWithSplitIds<E> extends RecordsBySplits<E> {
 
        private volatile boolean isRecycled;
 
        @SafeVarargs
        public TestingRecordsWithSplitIds(String splitId, E... records) {
-               this.splitId = checkNotNull(splitId);
-               this.records = new HashMap<>();
-               this.records.put(splitId, Arrays.asList(records));
-       }
-
-       @Override
-       public Collection<String> splitIds() {
-               return Collections.singleton(splitId);
-       }
-
-       @Override
-       public Map<String, Collection<E>> recordsBySplits() {
-               return records;
-       }
-
-       @Override
-       public Set<String> finishedSplits() {
-               return Collections.emptySet();
+               super(Collections.singletonMap(splitId, 
Arrays.asList(records)), Collections.emptySet());
        }
 
        @Override

Reply via email to