This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 77c61b33d0aa7c7a11dd9ace8159db49f803aa9b
Author: JingsongLi <[email protected]>
AuthorDate: Wed Jan 12 19:42:52 2022 +0800

    [FLINK-25628] Introduce SortMergeReader
    
    Co-authored-by: Jane Chan <[email protected]>
    Co-authored-by: tsreaper <[email protected]>
---
 .../file/mergetree/compact/SortMergeReader.java    | 213 +++++++++++++++++++++
 .../flink/table/store/file/utils/RecordReader.java |   1 -
 .../store/file/utils/RecordReaderIterator.java     |   1 -
 .../compact/CombiningRecordReaderTestBase.java     | 106 ++++++++++
 .../mergetree/compact/SortMergeReaderTestBase.java | 123 ++++++++++++
 .../store/file/utils/TestReusingRecordReader.java  | 117 +++++++++++
 6 files changed, 559 insertions(+), 2 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
new file mode 100644
index 0000000..0f3c184
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * This reader is to read a list of {@link RecordReader}, which is already 
sorted by key and
+ * sequence number, and perform a sort merge algorithm. {@link KeyValue}s with 
the same key will
+ * also be combined during sort merging.
+ *
+ * <p>NOTE: {@link KeyValue}s from the same {@link RecordReader} must not 
contain the same key.
+ */
+public class SortMergeReader implements RecordReader {
+
+    private final List<RecordReader> nextBatchReaders;
+    private final Comparator<RowData> userKeyComparator;
+    private final Accumulator accumulator;
+
+    private final PriorityQueue<Element> minHeap;
+    private final List<Element> polled;
+
+    protected SortMergeReader(
+            List<RecordReader> readers,
+            Comparator<RowData> userKeyComparator,
+            Accumulator accumulator) {
+        this.nextBatchReaders = new ArrayList<>(readers);
+        this.userKeyComparator = userKeyComparator;
+        this.accumulator = accumulator;
+
+        this.minHeap =
+                new PriorityQueue<>(
+                        (e1, e2) -> {
+                            int result = 
userKeyComparator.compare(e1.kv.key(), e2.kv.key());
+                            if (result != 0) {
+                                return result;
+                            }
+                            return Long.compare(e1.kv.sequenceNumber(), 
e2.kv.sequenceNumber());
+                        });
+        this.polled = new ArrayList<>();
+    }
+
+    public static RecordReader create(
+            List<RecordReader> readers,
+            Comparator<RowData> userKeyComparator,
+            Accumulator accumulator) {
+        return readers.size() == 1
+                ? readers.get(0)
+                : new SortMergeReader(readers, userKeyComparator, accumulator);
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator readBatch() throws IOException {
+        for (RecordReader reader : nextBatchReaders) {
+            while (true) {
+                RecordIterator iterator = reader.readBatch();
+                if (iterator == null) {
+                    // no more batches, permanently remove this reader
+                    reader.close();
+                    break;
+                }
+                KeyValue kv = iterator.next();
+                if (kv == null) {
+                    // empty iterator, clean up and try next batch
+                    iterator.releaseBatch();
+                } else {
+                    // found next kv
+                    minHeap.offer(new Element(kv, iterator, reader));
+                    break;
+                }
+            }
+        }
+        nextBatchReaders.clear();
+
+        return minHeap.isEmpty() ? null : new SortMergeIterator();
+    }
+
+    @Override
+    public void close() throws IOException {
+        for (RecordReader reader : nextBatchReaders) {
+            reader.close();
+        }
+        for (Element element : minHeap) {
+            element.iterator.releaseBatch();
+            element.reader.close();
+        }
+        for (Element element : polled) {
+            element.iterator.releaseBatch();
+            element.reader.close();
+        }
+    }
+
+    /** The iterator iterates on {@link SortMergeReader}. */
+    private class SortMergeIterator implements RecordIterator {
+
+        private boolean released = false;
+
+        @Override
+        public KeyValue next() throws IOException {
+            while (true) {
+                boolean hasMore = nextImpl();
+                if (!hasMore) {
+                    return null;
+                }
+                RowData accumulatedValue = accumulator.getValue();
+                if (accumulatedValue != null) {
+                    return polled.get(polled.size() - 
1).kv.setValue(accumulatedValue);
+                }
+            }
+        }
+
+        private boolean nextImpl() throws IOException {
+            Preconditions.checkState(
+                    !released, "SortMergeIterator#advanceNext is called after 
release");
+            Preconditions.checkState(
+                    nextBatchReaders.isEmpty(),
+                    "SortMergeIterator#advanceNext is called even if the last 
call returns null. "
+                            + "This is a bug.");
+
+            // add previously polled elements back to priority queue
+            for (Element element : polled) {
+                if (element.update()) {
+                    // still kvs left, add back to priority queue
+                    minHeap.offer(element);
+                } else {
+                    // reach end of batch, clean up
+                    element.iterator.releaseBatch();
+                    nextBatchReaders.add(element.reader);
+                }
+            }
+            polled.clear();
+
+            // there are readers reaching end of batch, so we end current batch
+            if (!nextBatchReaders.isEmpty()) {
+                return false;
+            }
+
+            accumulator.reset();
+            RowData key =
+                    Preconditions.checkNotNull(minHeap.peek(), "Min heap is 
empty. This is a bug.")
+                            .kv
+                            .key();
+
+            // fetch all elements with the same key
+            // note that the same iterator should not produce the same keys, 
so this code is correct
+            while (!minHeap.isEmpty()) {
+                Element element = minHeap.peek();
+                if (userKeyComparator.compare(key, element.kv.key()) != 0) {
+                    break;
+                }
+                minHeap.poll();
+                accumulator.add(element.kv.value());
+                polled.add(element);
+            }
+            return true;
+        }
+
+        @Override
+        public void releaseBatch() {
+            released = true;
+        }
+    }
+
+    private static class Element {
+        private KeyValue kv;
+        private final RecordIterator iterator;
+        private final RecordReader reader;
+
+        private Element(KeyValue kv, RecordIterator iterator, RecordReader 
reader) {
+            this.kv = kv;
+            this.iterator = iterator;
+            this.reader = reader;
+        }
+
+        // IMPORTANT: Must not call this for elements still in priority queue!
+        private boolean update() throws IOException {
+            KeyValue nextKv = iterator.next();
+            if (nextKv == null) {
+                return false;
+            }
+            kv = nextKv;
+            return true;
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java
index 89194e1..5d9113d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.file.utils;
 
 import org.apache.flink.table.store.file.KeyValue;
 
-
 import javax.annotation.Nullable;
 
 import java.io.Closeable;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java
index 58ad8e2..8353965 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.utils;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.util.CloseableIterator;
 
-
 import java.io.IOException;
 
 /** Wrap a {@link RecordReader} as an {@link CloseableIterator}. */
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CombiningRecordReaderTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CombiningRecordReaderTestBase.java
new file mode 100644
index 0000000..6fb2ac6
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CombiningRecordReaderTestBase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.table.store.file.utils.TestReusingRecordReader;
+
+import org.junit.jupiter.api.RepeatedTest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RecordReader}s which combines several other {@link 
RecordReader}s. */
+public abstract class CombiningRecordReaderTestBase {
+
+    protected static final RecordComparator KEY_COMPARATOR =
+            (a, b) -> Integer.compare(a.getInt(0), b.getInt(0));
+
+    protected abstract boolean addOnly();
+
+    protected abstract List<ReusingTestData> getExpected(List<ReusingTestData> 
input);
+
+    protected abstract RecordReader 
createRecordReader(List<TestReusingRecordReader> readers);
+
+    @RepeatedTest(100)
+    public void testRandom() throws IOException {
+        runTest(generateRandomData());
+    }
+
+    protected List<List<ReusingTestData>> parseData(String... stringsData) {
+        List<List<ReusingTestData>> readersData = new ArrayList<>();
+        for (String stringData : stringsData) {
+            readersData.add(ReusingTestData.parse(stringData));
+        }
+        return readersData;
+    }
+
+    protected List<List<ReusingTestData>> generateRandomData() {
+        Random random = new Random();
+        int numReaders = random.nextInt(20) + 1;
+        List<List<ReusingTestData>> readersData = new ArrayList<>();
+        for (int i = 0; i < numReaders; i++) {
+            readersData.add(
+                    ReusingTestData.generateOrderedNoDuplicatedKeys(
+                            random.nextInt(100) + 1, addOnly()));
+        }
+        return readersData;
+    }
+
+    protected void runTest(List<List<ReusingTestData>> readersData) throws 
IOException {
+        Iterator<ReusingTestData> expectedIterator =
+                getExpected(
+                                readersData.stream()
+                                        .flatMap(Collection::stream)
+                                        .collect(Collectors.toList()))
+                        .iterator();
+        List<TestReusingRecordReader> readers = new ArrayList<>();
+        for (List<ReusingTestData> readerData : readersData) {
+            readers.add(new TestReusingRecordReader(readerData));
+        }
+        RecordReader recordReader = createRecordReader(readers);
+
+        RecordReader.RecordIterator batch;
+        while ((batch = recordReader.readBatch()) != null) {
+            KeyValue kv;
+            while ((kv = batch.next()) != null) {
+                assertThat(expectedIterator.hasNext()).isTrue();
+                ReusingTestData expected = expectedIterator.next();
+                expected.assertEquals(kv);
+            }
+            batch.releaseBatch();
+        }
+        assertThat(expectedIterator.hasNext()).isFalse();
+        recordReader.close();
+
+        for (TestReusingRecordReader reader : readers) {
+            reader.assertCleanUp();
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java
new file mode 100644
index 0000000..d77fba0
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.table.store.file.utils.TestReusingRecordReader;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Tests for {@link SortMergeReader}. */
+public abstract class SortMergeReaderTestBase extends 
CombiningRecordReaderTestBase {
+
+    protected abstract Accumulator createAccumulator();
+
+    @Override
+    protected RecordReader createRecordReader(List<TestReusingRecordReader> 
readers) {
+        return new SortMergeReader(new ArrayList<>(readers), KEY_COMPARATOR, 
createAccumulator());
+    }
+
+    @Test
+    public void testEmpty() throws IOException {
+        runTest(parseData(""));
+        runTest(parseData("", "", ""));
+    }
+
+    @Test
+    public void testAlternateKeys() throws IOException {
+        runTest(
+                parseData(
+                        "1, 1, +, 100 | 3, 2, +, 300 | 5, 3, +, 200 | 7, 4, +, 
600 | 9, 20, +, 400",
+                        "0, 5, +, 0",
+                        "0, 10, +, 0",
+                        "",
+                        "2, 6, +, 200 | 4, 7, +, 400 | 6, 8, +, 600 | 8, 9, +, 
800"));
+    }
+
+    @Test
+    public void testDuplicateKeys() throws IOException {
+        runTest(parseData("1, 1, +, 100 | 3, 3, +, 300", "1, 4, +, 200 | 3, 5, 
+, 300"));
+    }
+
+    @Test
+    public void testLongTailRecords() throws IOException {
+        runTest(
+                parseData(
+                        "1, 1, +, 100 | 2, 500, +, 200",
+                        "1, 3, +, 100 | 3, 4, +, 300 | 5, 501, +, 500 | 7, 
503, +, 700 | "
+                                + "8, 504, +, 800 | 9, 505, +, 900 | 10, 506, 
+, 1000 | "
+                                + "11, 507, +, 1100 | 12, 508, +, 1200 | 13, 
509, +, 1300"));
+    }
+
+    /** Tests for {@link SortMergeReader} with {@link DeduplicateAccumulator}. 
*/
+    public static class WithDeduplicateAccumulator extends 
SortMergeReaderTestBase {
+
+        @Override
+        protected boolean addOnly() {
+            return false;
+        }
+
+        @Override
+        protected List<ReusingTestData> getExpected(List<ReusingTestData> 
input) {
+            return AccumulatorTestUtils.getExpectedForDeduplicate(input);
+        }
+
+        @Override
+        protected Accumulator createAccumulator() {
+            return new DeduplicateAccumulator();
+        }
+    }
+
+    /** Tests for {@link SortMergeReader} with {@link ValueCountAccumulator}. 
*/
+    public static class WithValueRecordAccumulatorTest extends 
SortMergeReaderTestBase {
+
+        @Override
+        protected boolean addOnly() {
+            return true;
+        }
+
+        @Override
+        protected List<ReusingTestData> getExpected(List<ReusingTestData> 
input) {
+            return AccumulatorTestUtils.getExpectedForValueCount(input);
+        }
+
+        @Override
+        protected Accumulator createAccumulator() {
+            return new ValueCountAccumulator();
+        }
+
+        @Test
+        public void testCancelingRecords() throws IOException {
+            runTest(
+                    parseData(
+                            "1, 1, +, 100 | 3, 5, +, -300 | 5, 300, +, 300",
+                            "",
+                            "1, 4, +, -200 | 3, 3, +, 300",
+                            "5, 100, +, -200 | 7, 123, +, -500",
+                            "7, 321, +, 200",
+                            "7, 456, +, 300"));
+            runTest(parseData("1, 2, +, 100", "1, 1, +, -100"));
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestReusingRecordReader.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestReusingRecordReader.java
new file mode 100644
index 0000000..a59ef95
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestReusingRecordReader.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.store.file.KeyValue;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * A testing {@link RecordReader} using {@link ReusingTestData} which produces 
batches of random
+ * sizes (possibly empty). {@link KeyValue}s produced by the same reader is 
reused to test that
+ * other components correctly handles the reusing.
+ */
+public class TestReusingRecordReader implements RecordReader {
+
+    private final List<ReusingTestData> testData;
+    private final ReusingKeyValue reuse;
+
+    private final List<TestRecordIterator> producedBatches;
+    private final Random random;
+
+    private int nextLowerBound;
+    private boolean closed;
+
+    public TestReusingRecordReader(List<ReusingTestData> testData) {
+        this.testData = testData;
+        this.reuse = new ReusingKeyValue();
+
+        this.producedBatches = new ArrayList<>();
+        this.random = new Random();
+
+        this.nextLowerBound = 0;
+        this.closed = false;
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator readBatch() {
+        assertThat(nextLowerBound != -1).isTrue();
+        if (nextLowerBound == testData.size() && random.nextBoolean()) {
+            nextLowerBound = -1;
+            return null;
+        }
+        int upperBound = random.nextInt(testData.size() - nextLowerBound + 1) 
+ nextLowerBound;
+        TestRecordIterator iterator = new TestRecordIterator(nextLowerBound, 
upperBound);
+        nextLowerBound = upperBound;
+        producedBatches.add(iterator);
+        return iterator;
+    }
+
+    @Override
+    public void close() throws IOException {
+        closed = true;
+    }
+
+    public void assertCleanUp() {
+        assertThat(closed).isTrue();
+        for (TestRecordIterator iterator : producedBatches) {
+            assertThat(iterator.released).isTrue();
+        }
+    }
+
+    private class TestRecordIterator implements RecordIterator {
+
+        private final int upperBound;
+
+        private int next;
+        private boolean released;
+
+        private TestRecordIterator(int lowerBound, int upperBound) {
+            this.upperBound = upperBound;
+
+            this.next = lowerBound;
+            this.released = false;
+        }
+
+        @Override
+        public KeyValue next() throws IOException {
+            assertThat(next != -1).isTrue();
+            if (next == upperBound) {
+                next = -1;
+                return null;
+            }
+            KeyValue result = reuse.update(testData.get(next));
+            next++;
+            return result;
+        }
+
+        @Override
+        public void releaseBatch() {
+            this.released = true;
+        }
+    }
+}

Reply via email to