This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 7a9bca33d7189d157c39401139a988181348a909 Author: JingsongLi <[email protected]> AuthorDate: Thu Jan 13 17:00:48 2022 +0800 [FLINK-25629] Introduce IntervalPartition Co-authored-by: tsreaper <[email protected]> --- .../table/store/file/mergetree/SortedRun.java | 112 ++++++++++++ .../file/mergetree/compact/IntervalPartition.java | 126 ++++++++++++++ .../mergetree/compact/IntervalPartitionTest.java | 191 +++++++++++++++++++++ 3 files changed, 429 insertions(+) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortedRun.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortedRun.java new file mode 100644 index 0000000..824bc3d --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortedRun.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * A {@link SortedRun} is a list of files sorted by their keys. The key intervals [minKey, maxKey] + * of these files do not overlap. + */ +public class SortedRun { + + private final List<SstFileMeta> files; + + private final long totalSize; + + private SortedRun(List<SstFileMeta> files) { + this.files = Collections.unmodifiableList(files); + long totalSize = 0L; + for (SstFileMeta file : files) { + totalSize += file.fileSize(); + } + this.totalSize = totalSize; + } + + public static SortedRun empty() { + return new SortedRun(Collections.emptyList()); + } + + public static SortedRun fromSingle(SstFileMeta file) { + return new SortedRun(Collections.singletonList(file)); + } + + public static SortedRun fromSorted(List<SstFileMeta> sortedFiles) { + return new SortedRun(sortedFiles); + } + + public static SortedRun fromUnsorted( + List<SstFileMeta> unsortedFiles, Comparator<RowData> keyComparator) { + unsortedFiles.sort((o1, o2) -> keyComparator.compare(o1.minKey(), o2.minKey())); + SortedRun run = new SortedRun(unsortedFiles); + run.validate(keyComparator); + return run; + } + + public List<SstFileMeta> files() { + return files; + } + + public boolean nonEmpty() { + return !files.isEmpty(); + } + + public long totalSize() { + return totalSize; + } + + @VisibleForTesting + public void validate(Comparator<RowData> comparator) { + for (int i = 1; i < files.size(); i++) { + Preconditions.checkState( + comparator.compare(files.get(i).minKey(), files.get(i - 1).maxKey()) > 0, + "SortedRun is not sorted and may contain overlapping key intervals. This is a bug."); + } + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof SortedRun)) { + return false; + } + SortedRun that = (SortedRun) o; + return files.equals(that.files); + } + + @Override + public int hashCode() { + return Objects.hash(files); + } + + @Override + public String toString() { + return "[" + + files.stream().map(SstFileMeta::toString).collect(Collectors.joining(", ")) + + "]"; + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartition.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartition.java new file mode 100644 index 0000000..735246f --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartition.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.mergetree.SortedRun; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.stream.Collectors; + +/** Algorithm to partition several sst files into the minimum number of {@link SortedRun}s. */ +public class IntervalPartition { + + private final List<SstFileMeta> files; + private final Comparator<RowData> keyComparator; + + public IntervalPartition(List<SstFileMeta> inputFiles, Comparator<RowData> keyComparator) { + this.files = new ArrayList<>(inputFiles); + this.files.sort( + (o1, o2) -> { + int leftResult = keyComparator.compare(o1.minKey(), o2.minKey()); + return leftResult == 0 + ? keyComparator.compare(o1.maxKey(), o2.maxKey()) + : leftResult; + }); + this.keyComparator = keyComparator; + } + + /** + * Returns a two-dimensional list of {@link SortedRun}s. + * + * <p>The elements of the outer list are sections. Key intervals between sections do not + * overlap. This extra layer is to minimize the number of {@link SortedRun}s dealt at the same + * time. + * + * <p>The elements of the inner list are {@link SortedRun}s within a section. + * + * <p>Users are expected to use the results by this way: + * + * <pre>{@code + * for (List<SortedRun> section : algorithm.partition()) { + * // do some merge sorting within section + * } + * }</pre> + */ + public List<List<SortedRun>> partition() { + List<List<SortedRun>> result = new ArrayList<>(); + List<SstFileMeta> section = new ArrayList<>(); + BinaryRowData bound = null; + + for (SstFileMeta meta : files) { + if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) { + // larger than current right bound, conclude current section and create a new one + result.add(partition(section)); + section.clear(); + bound = null; + } + section.add(meta); + if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) { + // update right bound + bound = meta.maxKey(); + } + } + if (!section.isEmpty()) { + // conclude last section + result.add(partition(section)); + } + + return result; + } + + private List<SortedRun> partition(List<SstFileMeta> metas) { + PriorityQueue<List<SstFileMeta>> queue = + new PriorityQueue<>( + (o1, o2) -> + // sort by max key of the last sst file + keyComparator.compare( + o1.get(o1.size() - 1).maxKey(), + o2.get(o2.size() - 1).maxKey())); + // create the initial partition + List<SstFileMeta> firstRun = new ArrayList<>(); + firstRun.add(metas.get(0)); + queue.add(firstRun); + + for (int i = 1; i < metas.size(); i++) { + SstFileMeta meta = metas.get(i); + // any file list whose max key < meta.minKey() is sufficient, + // for convenience we pick the smallest + List<SstFileMeta> top = queue.poll(); + if (keyComparator.compare(meta.minKey(), top.get(top.size() - 1).maxKey()) > 0) { + // append current file to an existing partition + top.add(meta); + } else { + // create a new partition + List<SstFileMeta> newRun = new ArrayList<>(); + newRun.add(meta); + queue.add(newRun); + } + queue.add(top); + } + + // order between partitions does not matter + return queue.stream().map(SortedRun::fromSorted).collect(Collectors.toList()); + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java new file mode 100644 index 0000000..f59c1c2 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.data.writer.BinaryRowWriter; +import org.apache.flink.table.runtime.generated.RecordComparator; +import org.apache.flink.table.store.file.mergetree.SortedRun; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; +import org.apache.flink.table.store.file.stats.FieldStats; + +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ThreadLocalRandom; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Tests for {@link IntervalPartition}. */ +public class IntervalPartitionTest { + + private static final RecordComparator COMPARATOR = + (RecordComparator) (o1, o2) -> o1.getInt(0) - o2.getInt(0); + + @Test + public void testSameMinKey() { + runTest( + "[100, 200], [100, 400], [100, 300], [100, 500]", + "[100, 200] | [100, 300] | [100, 400] | [100, 500]"); + } + + @Test + public void testSameMaxKey() { + runTest( + "[100, 500], [300, 500], [200, 500], [400, 500]", + "[100, 500] | [200, 500] | [300, 500] | [400, 500]"); + } + + @Test + public void testSectionPartitioning() { + // 0 5 10 15 20 25 30 + // |--------| + // |-| + // |-----| + // |-----| + // |-----------| + // |-------| + // 0 5 10 15 20 25 30 + runTest( + "[0, 9], [5, 7], [9, 15], [16, 22], [16, 28], [24, 32]", + "[0, 9] | [5, 7], [9, 15]\n" + "[16, 22], [24, 32] | [16, 28]"); + } + + private void runTest(String in, String ans) { + IntervalPartition algorithm = new IntervalPartition(parseMetas(in), COMPARATOR); + List<List<SortedRun>> expected = new ArrayList<>(); + for (String line : ans.split("\n")) { + expected.add(parseSortedRuns(line)); + } + + List<List<SortedRun>> actual = algorithm.partition(); + for (List<SortedRun> section : actual) { + for (SortedRun sortedRun : section) { + sortedRun.validate(COMPARATOR); + } + } + + // compare the results with multiset because the order between sorted runs within a section + // does not matter + assertThat(toMultiset(actual)).isEqualTo(toMultiset(expected)); + } + + @RepeatedTest(100) + public void randomTest() { + ThreadLocalRandom r = ThreadLocalRandom.current(); + List<int[]> intervals = new ArrayList<>(); + // construct some sorted runs + int numSortedRuns = r.nextInt(10) + 1; + for (int i = 0; i < numSortedRuns; i++) { + int numIntervals = r.nextInt(10) + 1; + // pick 2 * numIntervals distinct integers to make intervals + Set<Integer> set = new TreeSet<>(); + while (set.size() < 2 * numIntervals) { + int x; + do { + x = r.nextInt(1000); + } while (set.contains(x)); + set.add(x); + } + List<Integer> ints = new ArrayList<>(set); + for (int j = 0; j < 2 * numIntervals; j += 2) { + intervals.add(new int[] {ints.get(j), ints.get(j + 1)}); + } + } + // change the input to string + String input = + intervals.stream() + .map(a -> String.format("[%d, %d]", a[0], a[1])) + .collect(Collectors.joining(", ")); + // maximum number of sorted runs after partitioning must not exceed numSortedRuns + IntervalPartition algorithm = new IntervalPartition(parseMetas(input), COMPARATOR); + List<List<SortedRun>> result = algorithm.partition(); + for (List<SortedRun> section : result) { + assertTrue(section.size() <= numSortedRuns); + for (SortedRun sortedRun : section) { + sortedRun.validate(COMPARATOR); + } + } + } + + private List<SortedRun> parseSortedRuns(String in) { + List<SortedRun> sortedRuns = new ArrayList<>(); + for (String s : in.split("\\|")) { + sortedRuns.add(SortedRun.fromSorted(parseMetas(s))); + } + return sortedRuns; + } + + private List<SstFileMeta> parseMetas(String in) { + List<SstFileMeta> metas = new ArrayList<>(); + Pattern pattern = Pattern.compile("\\[(\\d+?), (\\d+?)]"); + Matcher matcher = pattern.matcher(in); + while (matcher.find()) { + metas.add( + makeInterval( + Integer.parseInt(matcher.group(1)), + Integer.parseInt(matcher.group(2)))); + } + return metas; + } + + private SstFileMeta makeInterval(int left, int right) { + BinaryRowData minKey = new BinaryRowData(1); + BinaryRowWriter minWriter = new BinaryRowWriter(minKey); + minWriter.writeInt(0, left); + minWriter.complete(); + BinaryRowData maxKey = new BinaryRowData(1); + BinaryRowWriter maxWriter = new BinaryRowWriter(maxKey); + maxWriter.writeInt(0, right); + maxWriter.complete(); + + return new SstFileMeta( + "DUMMY", + 100, + 25, + minKey, + maxKey, + new FieldStats[] {new FieldStats(left, right, 0)}, + 0, + 24, + 0); + } + + private List<Map<SortedRun, Integer>> toMultiset(List<List<SortedRun>> sections) { + List<Map<SortedRun, Integer>> result = new ArrayList<>(); + for (List<SortedRun> section : sections) { + Map<SortedRun, Integer> multiset = new HashMap<>(); + for (SortedRun sortedRun : section) { + multiset.compute(sortedRun, (k, v) -> v == null ? 1 : v + 1); + } + result.add(multiset); + } + return result; + } +}
