This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b8733d7d6f21f5c54716e594e9e7f9c950c06be5 Author: Stephan Ewen <[email protected]> AuthorDate: Mon Sep 7 00:20:38 2020 +0200 [FLINK-18680][connectors] Make connector base RecordsWithSplitIds more lightweight. This turns the RecordsWithSplitIds structure from a holder of materialized collections to a simple iterator-like structure to allow for lazy materialization and object reuse. --- .../base/source/reader/RecordsBySplits.java | 179 +++++++++++++-------- .../base/source/reader/RecordsWithSplitIds.java | 20 +-- .../base/source/reader/SourceReaderBase.java | 117 +++++++++----- .../base/source/reader/SplitsRecordIterator.java | 100 ------------ .../source/reader/fetcher/SplitFetcherTest.java | 10 +- .../base/source/reader/mocks/MockSplitReader.java | 5 +- .../reader/mocks/TestingRecordsWithSplitIds.java | 32 +--- 7 files changed, 212 insertions(+), 251 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java index 77cb594..0b15432 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java @@ -20,83 +20,59 @@ package org.apache.flink.connector.base.source.reader; import org.apache.flink.api.connector.source.SourceSplit; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * An implementation of RecordsWithSplitIds to host all the records by splits. */ public class RecordsBySplits<E> implements RecordsWithSplitIds<E> { - private Map<String, Collection<E>> recordsBySplits = new LinkedHashMap<>(); - private Set<String> finishedSplits = new HashSet<>(); - /** - * Add the record from the given split ID. - * - * @param splitId the split ID the record was from. - * @param record the record to add. - */ - public void add(String splitId, E record) { - recordsBySplits.computeIfAbsent(splitId, sid -> new ArrayList<>()).add(record); - } + private final Set<String> finishedSplits; - /** - * Add the record from the given source split. - * - * @param split the source split the record was from. - * @param record the record to add. - */ - public void add(SourceSplit split, E record) { - add(split.splitId(), record); - } + private final Iterator<Map.Entry<String, Collection<E>>> splitsIterator; - /** - * Add multiple records from the given split ID. - * - * @param splitId the split ID given the records were from. - * @param records the records to add. - */ - public void addAll(String splitId, Collection<E> records) { - this.recordsBySplits.compute(splitId, (id, r) -> { - if (r == null) { - r = records; - } else { - r.addAll(records); - } - return r; - }); - } + @Nullable + private Iterator<E> recordsInCurrentSplit; - /** - * Add multiple records from the given source split. - * - * @param split the source split the records were from. - * @param records the records to add. - */ - public void addAll(SourceSplit split, Collection<E> records) { - addAll(split.splitId(), records); + public RecordsBySplits( + final Map<String, Collection<E>> recordsBySplit, + final Set<String> finishedSplits) { + + this.splitsIterator = checkNotNull(recordsBySplit, "recordsBySplit").entrySet().iterator(); + this.finishedSplits = checkNotNull(finishedSplits, "finishedSplits"); } - /** - * Mark the split with the given ID as finished. - * - * @param splitId the ID of the finished split. - */ - public void addFinishedSplit(String splitId) { - finishedSplits.add(splitId); + @Nullable + @Override + public String nextSplit() { + if (splitsIterator.hasNext()) { + final Map.Entry<String, Collection<E>> next = splitsIterator.next(); + recordsInCurrentSplit = next.getValue().iterator(); + return next.getKey(); + } else { + return null; + } } - /** - * Mark multiple splits with the given IDs as finished. - * - * @param splitIds the IDs of the finished splits. - */ - public void addFinishedSplits(Collection<String> splitIds) { - finishedSplits.addAll(splitIds); + @Nullable + @Override + public E nextRecordFromSplit() { + if (recordsInCurrentSplit == null) { + throw new IllegalStateException(); + } + + return recordsInCurrentSplit.hasNext() ? recordsInCurrentSplit.next() : null; } @Override @@ -104,13 +80,86 @@ public class RecordsBySplits<E> implements RecordsWithSplitIds<E> { return finishedSplits; } - @Override - public Collection<String> splitIds() { - return recordsBySplits.keySet(); - } + // ------------------------------------------------------------------------ - @Override - public Map<String, Collection<E>> recordsBySplits() { - return recordsBySplits; + /** + * A utility builder to collect records in individual calls, rather than put a finished collection + * in the {@link RecordsBySplits#RecordsBySplits(Map, Set)} constructor. + */ + public static class Builder<E> { + + private final Map<String, Collection<E>> recordsBySplits = new LinkedHashMap<>(); + private final Set<String> finishedSplits = new HashSet<>(2); + + /** + * Add the record from the given split ID. + * + * @param splitId the split ID the record was from. + * @param record the record to add. + */ + public void add(String splitId, E record) { + recordsBySplits.computeIfAbsent(splitId, sid -> new ArrayList<>()).add(record); + } + + /** + * Add the record from the given source split. + * + * @param split the source split the record was from. + * @param record the record to add. + */ + public void add(SourceSplit split, E record) { + add(split.splitId(), record); + } + + /** + * Add multiple records from the given split ID. + * + * @param splitId the split ID given the records were from. + * @param records the records to add. + */ + public void addAll(String splitId, Collection<E> records) { + this.recordsBySplits.compute(splitId, (id, r) -> { + if (r == null) { + r = records; + } else { + r.addAll(records); + } + return r; + }); + } + + /** + * Add multiple records from the given source split. + * + * @param split the source split the records were from. + * @param records the records to add. + */ + public void addAll(SourceSplit split, Collection<E> records) { + addAll(split.splitId(), records); + } + + /** + * Mark the split with the given ID as finished. + * + * @param splitId the ID of the finished split. + */ + public void addFinishedSplit(String splitId) { + finishedSplits.add(splitId); + } + + /** + * Mark multiple splits with the given IDs as finished. + * + * @param splitIds the IDs of the finished splits. + */ + public void addFinishedSplits(Collection<String> splitIds) { + finishedSplits.addAll(splitIds); + } + + public RecordsBySplits<E> build() { + return new RecordsBySplits<>( + recordsBySplits.isEmpty() ? Collections.emptyMap() : recordsBySplits, + finishedSplits.isEmpty() ? Collections.emptySet() : finishedSplits); + } } } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java index dc915b3..0c8fd07 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java @@ -18,8 +18,8 @@ package org.apache.flink.connector.base.source.reader; -import java.util.Collection; -import java.util.Map; +import javax.annotation.Nullable; + import java.util.Set; /** @@ -28,18 +28,18 @@ import java.util.Set; public interface RecordsWithSplitIds<E> { /** - * Get all the split ids. - * - * @return a collection of split ids. + * Moves to the next split. This method is also called initially to move to the + * first split. Returns null, if no splits are left. */ - Collection<String> splitIds(); + @Nullable + String nextSplit(); /** - * Get all the records by Splits. - * - * @return a mapping from split ids to the records. + * Gets the next record from the current split. Returns null if no more records are left + * in this split. */ - Map<String, Collection<E>> recordsBySplits(); + @Nullable + E nextRecordFromSplit(); /** * Get the finished splits. diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 4a41d49..02b7a7c 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -35,14 +35,19 @@ import org.apache.flink.core.io.InputStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import static org.apache.flink.util.Preconditions.checkState; + /** * An abstract implementation of {@link SourceReader} which provides some sychronization between * the mail box main thread and the SourceReader internal threads. This class allows user to @@ -81,8 +86,10 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt /** The context of this source reader. */ protected SourceReaderContext context; - /** The last element to ensure it is fully handled. */ - private SplitsRecordIterator<E> splitIter; + /** The latest fetched batch of records-by-split from the split reader. */ + @Nullable private RecordsWithSplitIds<E> currentFetch; + @Nullable private SplitContext<T, SplitStateT> currentSplitContext; + @Nullable private SourceOutput<T> currentSplitOutput; /** Indicating whether the SourceReader will be assigned more splits or not.*/ private boolean noMoreSplitsAssignment; @@ -99,7 +106,6 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt this.splitFetcherManager = splitFetcherManager; this.recordEmitter = recordEmitter; this.splitStates = new HashMap<>(); - this.splitIter = null; this.options = new SourceReaderOptions(config); this.config = config; this.context = context; @@ -107,60 +113,87 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt } @Override - public void start() { - - } + public void start() {} @Override public InputStatus pollNext(ReaderOutput<T> output) throws Exception { splitFetcherManager.checkErrors(); - // poll from the queue if the last element was successfully handled. Otherwise - // just pass the last element again. - RecordsWithSplitIds<E> recordsWithSplitId = null; - boolean newFetch = splitIter == null || !splitIter.hasNext(); - if (newFetch) { - recordsWithSplitId = elementsQueue.poll(); + + // make sure we have a fetch we are working on, or move to the next + final RecordsWithSplitIds<E> recordsWithSplitId = getCurrentOrNewFetch(output); + if (recordsWithSplitId == null) { + return trace(finishedOrAvailableLater()); } - InputStatus status; - if (newFetch && recordsWithSplitId == null) { - // No element available, set to available later if needed. - status = finishedOrAvailableLater(); - } else { - // Update the record iterator if it is a new fetch. - if (newFetch) { - splitIter = new SplitsRecordIterator<>(recordsWithSplitId); - } + // we need to loop here, because we may have to go across splits + while (true) { // Process one record. - if (splitIter.hasNext()) { + final E record = recordsWithSplitId.nextRecordFromSplit(); + if (record != null) { // emit the record. - final E record = splitIter.next(); - final SplitContext<T, SplitStateT> splitContext = splitStates.get(splitIter.currentSplitId()); - final SourceOutput<T> splitOutput = splitContext.getOrCreateSplitOutput(output); - recordEmitter.emitRecord(record, splitOutput, splitContext.state); + recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state); LOG.trace("Emitted record: {}", record); + return trace(InputStatus.MORE_AVAILABLE); } - // Do some cleanup if the all the records in the current splitIter have been processed. - if (!splitIter.hasNext()) { - // First remove the state of the split. - splitIter.finishedSplitIds().forEach((id) -> { - splitStates.remove(id); - output.releaseOutputForSplit(id); - }); - // Handle the finished splits. - onSplitFinished(splitIter.finishedSplitIds()); - splitIter.dispose(); - // Prepare the return status based on the availability of the next element. - status = finishedOrAvailableLater(); - } else { - // There are more records from the current splitIter. - status = InputStatus.MORE_AVAILABLE; + else if (!moveToNextSplit(recordsWithSplitId, output)) { + return trace(finishedOrAvailableLater()); } + // else fall through the loop } + } + + private InputStatus trace(InputStatus status) { LOG.trace("Source reader status: {}", status); return status; } + @Nullable + private RecordsWithSplitIds<E> getCurrentOrNewFetch(final ReaderOutput<T> output) { + RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch; + if (recordsWithSplitId != null) { + return recordsWithSplitId; + } + + recordsWithSplitId = elementsQueue.poll(); + if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) { + // No element available, set to available later if needed. + return null; + } + + currentFetch = recordsWithSplitId; + return recordsWithSplitId; + } + + private void finishCurrentFetch(final RecordsWithSplitIds<E> fetch, final ReaderOutput<T> output) { + currentFetch = null; + currentSplitContext = null; + currentSplitOutput = null; + + final Set<String> finishedSplits = fetch.finishedSplits(); + if (!finishedSplits.isEmpty()) { + for (String finishedSplitId : finishedSplits) { + splitStates.remove(finishedSplitId); + output.releaseOutputForSplit(finishedSplitId); + } + onSplitFinished(finishedSplits); + } + + fetch.recycle(); + } + + private boolean moveToNextSplit(RecordsWithSplitIds<E> recordsWithSplitIds, ReaderOutput<T> output) { + final String nextSplitId = recordsWithSplitIds.nextSplit(); + if (nextSplitId == null) { + finishCurrentFetch(recordsWithSplitIds, output); + return false; + } + + currentSplitContext = splitStates.get(nextSplitId); + checkState(currentSplitContext != null, "Have records for a split that was not registered"); + currentSplitOutput = currentSplitContext.getOrCreateSplitOutput(output); + return true; + } + @Override public CompletableFuture<Void> isAvailable() { // The order matters here. We first get the future. After this point, if the queue @@ -235,7 +268,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt private InputStatus finishedOrAvailableLater() { boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers(); - boolean allElementsEmitted = elementsQueue.isEmpty() && (splitIter == null || !splitIter.hasNext()); + boolean allElementsEmitted = elementsQueue.isEmpty(); if (noMoreSplitsAssignment && allFetchersHaveShutdown && allElementsEmitted) { return InputStatus.END_OF_INPUT; } else { diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java deleted file mode 100644 index c83bec0..0000000 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.base.source.reader; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; - -/** - * A class that wraps around a {@link RecordsWithSplitIds} and provide a consistent iterator. - */ -public class SplitsRecordIterator<E> { - - private final RecordsWithSplitIds<E> recordsWithSplitIds; - private final Iterator<Map.Entry<String, Collection<E>>> splitIter; - private String currentSplitId; - private Iterator<E> recordsIter; - - /** - * Construct a cross-splits iterator for the records. - * - * @param recordsWithSplitIds the records by splits. - */ - public SplitsRecordIterator(RecordsWithSplitIds<E> recordsWithSplitIds) { - this.recordsWithSplitIds = recordsWithSplitIds; - Map<String, Collection<E>> recordsBySplits = recordsWithSplitIds.recordsBySplits(); - // Remove empty splits; - recordsBySplits.entrySet().removeIf(e -> e.getValue().isEmpty()); - this.splitIter = recordsBySplits.entrySet().iterator(); - } - - /** - * Whether their are more records available. - * - * @return true if there are more records, false otherwise. - */ - public boolean hasNext() { - if (recordsIter == null || !recordsIter.hasNext()) { - if (splitIter.hasNext()) { - Map.Entry<String, Collection<E>> entry = splitIter.next(); - currentSplitId = entry.getKey(); - recordsIter = entry.getValue().iterator(); - } else { - return false; - } - } - return recordsIter.hasNext() || splitIter.hasNext(); - } - - /** - * Get the next record. - * @return the next record. - */ - public E next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - return recordsIter.next(); - } - - /** - * Get the split id of the last returned record. - * - * @return the split id of the last returned record. - */ - public String currentSplitId() { - return currentSplitId; - } - - /** - * The split Ids that are finished after all the records in this iterator are emitted. - * - * @return a set of finished split Ids. - */ - public Set<String> finishedSplitIds() { - return recordsWithSplitIds.finishedSplits(); - } - - public void dispose() { - recordsWithSplitIds.recycle(); - } -} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java index 953d7aa..e9c2ad2 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java @@ -95,9 +95,13 @@ public class SplitFetcherTest { interrupter.start(); while (recordsRead.size() < NUM_SPLITS * NUM_RECORDS_PER_SPLIT) { - elementQueue.take().recordsBySplits().values().forEach(records -> - // Ensure there is no duplicate records. - records.forEach(arr -> assertTrue(recordsRead.add(arr[0])))); + final RecordsWithSplitIds<int[]> nextBatch = elementQueue.take(); + while (nextBatch.nextSplit() != null) { + int[] arr; + while ((arr = nextBatch.nextRecordFromSplit()) != null) { + assertTrue(recordsRead.add(arr[0])); + } + } } assertEquals(NUM_TOTAL_RECORDS, recordsRead.size()); diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java index 3c6d8df..00d4d71 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java @@ -81,7 +81,8 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> { } private RecordsBySplits<int[]> getRecords() { - RecordsBySplits<int[]> records = new RecordsBySplits<>(); + final RecordsBySplits.Builder<int[]> records = new RecordsBySplits.Builder<>(); + try { for (Map.Entry<String, MockSourceSplit> entry : splits.entrySet()) { MockSourceSplit split = entry.getValue(); @@ -102,6 +103,6 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> { throw new RuntimeException("Caught unexpected interrupted exception."); } } - return records; + return records.build(); } } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java index 2a8377a..3aa49ac 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java @@ -18,48 +18,22 @@ package org.apache.flink.connector.base.source.reader.mocks; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import static org.apache.flink.util.Preconditions.checkNotNull; /** * A mock implementation of {@link RecordsWithSplitIds} that returns a given set of records. */ -public class TestingRecordsWithSplitIds<E> implements RecordsWithSplitIds<E> { - - private final Map<String, Collection<E>> records; - - private final String splitId; +public class TestingRecordsWithSplitIds<E> extends RecordsBySplits<E> { private volatile boolean isRecycled; @SafeVarargs public TestingRecordsWithSplitIds(String splitId, E... records) { - this.splitId = checkNotNull(splitId); - this.records = new HashMap<>(); - this.records.put(splitId, Arrays.asList(records)); - } - - @Override - public Collection<String> splitIds() { - return Collections.singleton(splitId); - } - - @Override - public Map<String, Collection<E>> recordsBySplits() { - return records; - } - - @Override - public Set<String> finishedSplits() { - return Collections.emptySet(); + super(Collections.singletonMap(splitId, Arrays.asList(records)), Collections.emptySet()); } @Override
