This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 77c61b33d0aa7c7a11dd9ace8159db49f803aa9b Author: JingsongLi <[email protected]> AuthorDate: Wed Jan 12 19:42:52 2022 +0800 [FLINK-25628] Introduce SortMergeReader Co-authored-by: Jane Chan <[email protected]> Co-authored-by: tsreaper <[email protected]> --- .../file/mergetree/compact/SortMergeReader.java | 213 +++++++++++++++++++++ .../flink/table/store/file/utils/RecordReader.java | 1 - .../store/file/utils/RecordReaderIterator.java | 1 - .../compact/CombiningRecordReaderTestBase.java | 106 ++++++++++ .../mergetree/compact/SortMergeReaderTestBase.java | 123 ++++++++++++ .../store/file/utils/TestReusingRecordReader.java | 117 +++++++++++ 6 files changed, 559 insertions(+), 2 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java new file mode 100644 index 0000000..0f3c184 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +/** + * This reader is to read a list of {@link RecordReader}, which is already sorted by key and + * sequence number, and perform a sort merge algorithm. {@link KeyValue}s with the same key will + * also be combined during sort merging. + * + * <p>NOTE: {@link KeyValue}s from the same {@link RecordReader} must not contain the same key. + */ +public class SortMergeReader implements RecordReader { + + private final List<RecordReader> nextBatchReaders; + private final Comparator<RowData> userKeyComparator; + private final Accumulator accumulator; + + private final PriorityQueue<Element> minHeap; + private final List<Element> polled; + + protected SortMergeReader( + List<RecordReader> readers, + Comparator<RowData> userKeyComparator, + Accumulator accumulator) { + this.nextBatchReaders = new ArrayList<>(readers); + this.userKeyComparator = userKeyComparator; + this.accumulator = accumulator; + + this.minHeap = + new PriorityQueue<>( + (e1, e2) -> { + int result = userKeyComparator.compare(e1.kv.key(), e2.kv.key()); + if (result != 0) { + return result; + } + return Long.compare(e1.kv.sequenceNumber(), e2.kv.sequenceNumber()); + }); + this.polled = new ArrayList<>(); + } + + public static RecordReader create( + List<RecordReader> readers, + Comparator<RowData> userKeyComparator, + Accumulator accumulator) { + return readers.size() == 1 + ? readers.get(0) + : new SortMergeReader(readers, userKeyComparator, accumulator); + } + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + for (RecordReader reader : nextBatchReaders) { + while (true) { + RecordIterator iterator = reader.readBatch(); + if (iterator == null) { + // no more batches, permanently remove this reader + reader.close(); + break; + } + KeyValue kv = iterator.next(); + if (kv == null) { + // empty iterator, clean up and try next batch + iterator.releaseBatch(); + } else { + // found next kv + minHeap.offer(new Element(kv, iterator, reader)); + break; + } + } + } + nextBatchReaders.clear(); + + return minHeap.isEmpty() ? null : new SortMergeIterator(); + } + + @Override + public void close() throws IOException { + for (RecordReader reader : nextBatchReaders) { + reader.close(); + } + for (Element element : minHeap) { + element.iterator.releaseBatch(); + element.reader.close(); + } + for (Element element : polled) { + element.iterator.releaseBatch(); + element.reader.close(); + } + } + + /** The iterator iterates on {@link SortMergeReader}. */ + private class SortMergeIterator implements RecordIterator { + + private boolean released = false; + + @Override + public KeyValue next() throws IOException { + while (true) { + boolean hasMore = nextImpl(); + if (!hasMore) { + return null; + } + RowData accumulatedValue = accumulator.getValue(); + if (accumulatedValue != null) { + return polled.get(polled.size() - 1).kv.setValue(accumulatedValue); + } + } + } + + private boolean nextImpl() throws IOException { + Preconditions.checkState( + !released, "SortMergeIterator#advanceNext is called after release"); + Preconditions.checkState( + nextBatchReaders.isEmpty(), + "SortMergeIterator#advanceNext is called even if the last call returns null. " + + "This is a bug."); + + // add previously polled elements back to priority queue + for (Element element : polled) { + if (element.update()) { + // still kvs left, add back to priority queue + minHeap.offer(element); + } else { + // reach end of batch, clean up + element.iterator.releaseBatch(); + nextBatchReaders.add(element.reader); + } + } + polled.clear(); + + // there are readers reaching end of batch, so we end current batch + if (!nextBatchReaders.isEmpty()) { + return false; + } + + accumulator.reset(); + RowData key = + Preconditions.checkNotNull(minHeap.peek(), "Min heap is empty. This is a bug.") + .kv + .key(); + + // fetch all elements with the same key + // note that the same iterator should not produce the same keys, so this code is correct + while (!minHeap.isEmpty()) { + Element element = minHeap.peek(); + if (userKeyComparator.compare(key, element.kv.key()) != 0) { + break; + } + minHeap.poll(); + accumulator.add(element.kv.value()); + polled.add(element); + } + return true; + } + + @Override + public void releaseBatch() { + released = true; + } + } + + private static class Element { + private KeyValue kv; + private final RecordIterator iterator; + private final RecordReader reader; + + private Element(KeyValue kv, RecordIterator iterator, RecordReader reader) { + this.kv = kv; + this.iterator = iterator; + this.reader = reader; + } + + // IMPORTANT: Must not call this for elements still in priority queue! + private boolean update() throws IOException { + KeyValue nextKv = iterator.next(); + if (nextKv == null) { + return false; + } + kv = nextKv; + return true; + } + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java index 89194e1..5d9113d 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java @@ -20,7 +20,6 @@ package org.apache.flink.table.store.file.utils; import org.apache.flink.table.store.file.KeyValue; - import javax.annotation.Nullable; import java.io.Closeable; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java index 58ad8e2..8353965 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java @@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.utils; import org.apache.flink.table.store.file.KeyValue; import org.apache.flink.util.CloseableIterator; - import java.io.IOException; /** Wrap a {@link RecordReader} as an {@link CloseableIterator}. */ diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CombiningRecordReaderTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CombiningRecordReaderTestBase.java new file mode 100644 index 0000000..6fb2ac6 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CombiningRecordReaderTestBase.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.runtime.generated.RecordComparator; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.table.store.file.utils.ReusingTestData; +import org.apache.flink.table.store.file.utils.TestReusingRecordReader; + +import org.junit.jupiter.api.RepeatedTest; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link RecordReader}s which combines several other {@link RecordReader}s. */ +public abstract class CombiningRecordReaderTestBase { + + protected static final RecordComparator KEY_COMPARATOR = + (a, b) -> Integer.compare(a.getInt(0), b.getInt(0)); + + protected abstract boolean addOnly(); + + protected abstract List<ReusingTestData> getExpected(List<ReusingTestData> input); + + protected abstract RecordReader createRecordReader(List<TestReusingRecordReader> readers); + + @RepeatedTest(100) + public void testRandom() throws IOException { + runTest(generateRandomData()); + } + + protected List<List<ReusingTestData>> parseData(String... stringsData) { + List<List<ReusingTestData>> readersData = new ArrayList<>(); + for (String stringData : stringsData) { + readersData.add(ReusingTestData.parse(stringData)); + } + return readersData; + } + + protected List<List<ReusingTestData>> generateRandomData() { + Random random = new Random(); + int numReaders = random.nextInt(20) + 1; + List<List<ReusingTestData>> readersData = new ArrayList<>(); + for (int i = 0; i < numReaders; i++) { + readersData.add( + ReusingTestData.generateOrderedNoDuplicatedKeys( + random.nextInt(100) + 1, addOnly())); + } + return readersData; + } + + protected void runTest(List<List<ReusingTestData>> readersData) throws IOException { + Iterator<ReusingTestData> expectedIterator = + getExpected( + readersData.stream() + .flatMap(Collection::stream) + .collect(Collectors.toList())) + .iterator(); + List<TestReusingRecordReader> readers = new ArrayList<>(); + for (List<ReusingTestData> readerData : readersData) { + readers.add(new TestReusingRecordReader(readerData)); + } + RecordReader recordReader = createRecordReader(readers); + + RecordReader.RecordIterator batch; + while ((batch = recordReader.readBatch()) != null) { + KeyValue kv; + while ((kv = batch.next()) != null) { + assertThat(expectedIterator.hasNext()).isTrue(); + ReusingTestData expected = expectedIterator.next(); + expected.assertEquals(kv); + } + batch.releaseBatch(); + } + assertThat(expectedIterator.hasNext()).isFalse(); + recordReader.close(); + + for (TestReusingRecordReader reader : readers) { + reader.assertCleanUp(); + } + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java new file mode 100644 index 0000000..d77fba0 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.table.store.file.utils.ReusingTestData; +import org.apache.flink.table.store.file.utils.TestReusingRecordReader; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** Tests for {@link SortMergeReader}. */ +public abstract class SortMergeReaderTestBase extends CombiningRecordReaderTestBase { + + protected abstract Accumulator createAccumulator(); + + @Override + protected RecordReader createRecordReader(List<TestReusingRecordReader> readers) { + return new SortMergeReader(new ArrayList<>(readers), KEY_COMPARATOR, createAccumulator()); + } + + @Test + public void testEmpty() throws IOException { + runTest(parseData("")); + runTest(parseData("", "", "")); + } + + @Test + public void testAlternateKeys() throws IOException { + runTest( + parseData( + "1, 1, +, 100 | 3, 2, +, 300 | 5, 3, +, 200 | 7, 4, +, 600 | 9, 20, +, 400", + "0, 5, +, 0", + "0, 10, +, 0", + "", + "2, 6, +, 200 | 4, 7, +, 400 | 6, 8, +, 600 | 8, 9, +, 800")); + } + + @Test + public void testDuplicateKeys() throws IOException { + runTest(parseData("1, 1, +, 100 | 3, 3, +, 300", "1, 4, +, 200 | 3, 5, +, 300")); + } + + @Test + public void testLongTailRecords() throws IOException { + runTest( + parseData( + "1, 1, +, 100 | 2, 500, +, 200", + "1, 3, +, 100 | 3, 4, +, 300 | 5, 501, +, 500 | 7, 503, +, 700 | " + + "8, 504, +, 800 | 9, 505, +, 900 | 10, 506, +, 1000 | " + + "11, 507, +, 1100 | 12, 508, +, 1200 | 13, 509, +, 1300")); + } + + /** Tests for {@link SortMergeReader} with {@link DeduplicateAccumulator}. */ + public static class WithDeduplicateAccumulator extends SortMergeReaderTestBase { + + @Override + protected boolean addOnly() { + return false; + } + + @Override + protected List<ReusingTestData> getExpected(List<ReusingTestData> input) { + return AccumulatorTestUtils.getExpectedForDeduplicate(input); + } + + @Override + protected Accumulator createAccumulator() { + return new DeduplicateAccumulator(); + } + } + + /** Tests for {@link SortMergeReader} with {@link ValueCountAccumulator}. */ + public static class WithValueRecordAccumulatorTest extends SortMergeReaderTestBase { + + @Override + protected boolean addOnly() { + return true; + } + + @Override + protected List<ReusingTestData> getExpected(List<ReusingTestData> input) { + return AccumulatorTestUtils.getExpectedForValueCount(input); + } + + @Override + protected Accumulator createAccumulator() { + return new ValueCountAccumulator(); + } + + @Test + public void testCancelingRecords() throws IOException { + runTest( + parseData( + "1, 1, +, 100 | 3, 5, +, -300 | 5, 300, +, 300", + "", + "1, 4, +, -200 | 3, 3, +, 300", + "5, 100, +, -200 | 7, 123, +, -500", + "7, 321, +, 200", + "7, 456, +, 300")); + runTest(parseData("1, 2, +, 100", "1, 1, +, -100")); + } + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestReusingRecordReader.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestReusingRecordReader.java new file mode 100644 index 0000000..a59ef95 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestReusingRecordReader.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.utils; + +import org.apache.flink.table.store.file.KeyValue; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * A testing {@link RecordReader} using {@link ReusingTestData} which produces batches of random + * sizes (possibly empty). {@link KeyValue}s produced by the same reader is reused to test that + * other components correctly handles the reusing. + */ +public class TestReusingRecordReader implements RecordReader { + + private final List<ReusingTestData> testData; + private final ReusingKeyValue reuse; + + private final List<TestRecordIterator> producedBatches; + private final Random random; + + private int nextLowerBound; + private boolean closed; + + public TestReusingRecordReader(List<ReusingTestData> testData) { + this.testData = testData; + this.reuse = new ReusingKeyValue(); + + this.producedBatches = new ArrayList<>(); + this.random = new Random(); + + this.nextLowerBound = 0; + this.closed = false; + } + + @Nullable + @Override + public RecordIterator readBatch() { + assertThat(nextLowerBound != -1).isTrue(); + if (nextLowerBound == testData.size() && random.nextBoolean()) { + nextLowerBound = -1; + return null; + } + int upperBound = random.nextInt(testData.size() - nextLowerBound + 1) + nextLowerBound; + TestRecordIterator iterator = new TestRecordIterator(nextLowerBound, upperBound); + nextLowerBound = upperBound; + producedBatches.add(iterator); + return iterator; + } + + @Override + public void close() throws IOException { + closed = true; + } + + public void assertCleanUp() { + assertThat(closed).isTrue(); + for (TestRecordIterator iterator : producedBatches) { + assertThat(iterator.released).isTrue(); + } + } + + private class TestRecordIterator implements RecordIterator { + + private final int upperBound; + + private int next; + private boolean released; + + private TestRecordIterator(int lowerBound, int upperBound) { + this.upperBound = upperBound; + + this.next = lowerBound; + this.released = false; + } + + @Override + public KeyValue next() throws IOException { + assertThat(next != -1).isTrue(); + if (next == upperBound) { + next = -1; + return null; + } + KeyValue result = reuse.update(testData.get(next)); + next++; + return result; + } + + @Override + public void releaseBatch() { + this.released = true; + } + } +}
