This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit ac9d902bef8959742891ff63d435aa498507f084 Author: JingsongLi <[email protected]> AuthorDate: Thu Jan 13 17:05:57 2022 +0800 [FLINK-25629] Introduce CompactManager This closes #5 --- .../flink/table/store/file/mergetree/Levels.java | 138 ++++++++++ .../file/mergetree/compact/CompactManager.java | 214 +++++++++++++++ .../file/mergetree/compact/CompactManagerTest.java | 288 +++++++++++++++++++++ 3 files changed, 640 insertions(+) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java new file mode 100644 index 0000000..5c99b04 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** A class which stores all level files of merge tree. */ +public class Levels { + + private final Comparator<RowData> keyComparator; + + private final TreeSet<SstFileMeta> level0; + + private final List<SortedRun> levels; + + public Levels(Comparator<RowData> keyComparator, List<SstFileMeta> inputFiles, int numLevels) { + this.keyComparator = keyComparator; + checkArgument(numLevels > 1, "levels must be at least 2."); + this.level0 = + new TreeSet<>(Comparator.comparing(SstFileMeta::maxSequenceNumber).reversed()); + this.levels = new ArrayList<>(); + for (int i = 1; i < numLevels; i++) { + levels.add(SortedRun.empty()); + } + + Map<Integer, List<SstFileMeta>> levelMap = new HashMap<>(); + for (SstFileMeta file : inputFiles) { + levelMap.computeIfAbsent(file.level(), level -> new ArrayList<>()).add(file); + } + levelMap.forEach((level, files) -> updateLevel(level, emptyList(), files)); + } + + public void addLevel0File(SstFileMeta file) { + checkArgument(file.level() == 0); + level0.add(file); + } + + public SortedRun runOfLevel(int level) { + checkArgument(level > 0, "Level0 dose not have one single sorted run."); + return levels.get(level - 1); + } + + public int numberOfLevels() { + return levels.size() + 1; + } + + /** @return the highest non-empty level or -1 if all levels empty. */ + public int nonEmptyHighestLevel() { + int i; + for (i = levels.size() - 1; i >= 0; i--) { + if (levels.get(i).nonEmpty()) { + return i + 1; + } + } + return level0.isEmpty() ? -1 : 0; + } + + public List<SstFileMeta> allFiles() { + List<SstFileMeta> files = new ArrayList<>(); + List<LevelSortedRun> runs = levelSortedRuns(); + for (LevelSortedRun run : runs) { + files.addAll(run.run().files()); + } + return files; + } + + public List<LevelSortedRun> levelSortedRuns() { + List<LevelSortedRun> runs = new ArrayList<>(); + level0.forEach(file -> runs.add(new LevelSortedRun(0, SortedRun.fromSingle(file)))); + for (int i = 0; i < levels.size(); i++) { + SortedRun run = levels.get(i); + if (run.nonEmpty()) { + runs.add(new LevelSortedRun(i + 1, run)); + } + } + return runs; + } + + public void update(List<SstFileMeta> before, List<SstFileMeta> after) { + Map<Integer, List<SstFileMeta>> groupedBefore = groupByLevel(before); + Map<Integer, List<SstFileMeta>> groupedAfter = groupByLevel(after); + for (int i = 0; i < numberOfLevels(); i++) { + updateLevel( + i, + groupedBefore.getOrDefault(i, emptyList()), + groupedAfter.getOrDefault(i, emptyList())); + } + } + + private void updateLevel(int level, List<SstFileMeta> before, List<SstFileMeta> after) { + if (before.isEmpty() && after.isEmpty()) { + return; + } + + if (level == 0) { + before.forEach(level0::remove); + level0.addAll(after); + } else { + List<SstFileMeta> files = new ArrayList<>(runOfLevel(level).files()); + files.removeAll(before); + files.addAll(after); + levels.set(level - 1, SortedRun.fromUnsorted(files, keyComparator)); + } + } + + private Map<Integer, List<SstFileMeta>> groupByLevel(List<SstFileMeta> files) { + return files.stream() + .collect(Collectors.groupingBy(SstFileMeta::level, Collectors.toList())); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java new file mode 100644 index 0000000..f20bae4 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.mergetree.Levels; +import org.apache.flink.table.store.file.mergetree.SortedRun; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import static java.util.Collections.singletonList; + +/** Manager to submit compaction task. */ +public class CompactManager { + + private final ExecutorService executor; + + private final CompactStrategy strategy; + + private final Comparator<RowData> keyComparator; + + private final long minFileSize; + + private final Rewriter rewriter; + + private Future<CompactResult> taskFuture; + + public CompactManager( + ExecutorService executor, + CompactStrategy strategy, + Comparator<RowData> keyComparator, + long minFileSize, + Rewriter rewriter) { + this.executor = executor; + this.minFileSize = minFileSize; + this.keyComparator = keyComparator; + this.strategy = strategy; + this.rewriter = rewriter; + } + + /** Submit a new compaction task. */ + public void submitCompaction(Levels levels) { + if (taskFuture != null) { + throw new IllegalStateException( + "Please finish the previous compaction before submitting new one."); + } + strategy.pick(levels.numberOfLevels(), levels.levelSortedRuns()) + .ifPresent( + unit -> { + if (unit.files().size() < 2) { + return; + } + boolean dropDelete = + unit.outputLevel() >= levels.nonEmptyHighestLevel(); + CompactTask task = new CompactTask(unit, dropDelete); + taskFuture = executor.submit(task); + }); + } + + /** Finish current task, and update result files to {@link Levels}. */ + public Optional<CompactResult> finishCompaction(Levels levels) + throws ExecutionException, InterruptedException { + if (taskFuture != null) { + CompactResult result = taskFuture.get(); + levels.update(result.before(), result.after()); + taskFuture = null; + return Optional.of(result); + } + return Optional.empty(); + } + + /** Rewrite sections to the files. */ + @FunctionalInterface + public interface Rewriter { + + List<SstFileMeta> rewrite( + int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) + throws Exception; + } + + /** Result of compaction. */ + public interface CompactResult { + + List<SstFileMeta> before(); + + List<SstFileMeta> after(); + } + + // -------------------------------------------------------------------------------------------- + // Internal classes + // -------------------------------------------------------------------------------------------- + + /** Compaction task. */ + private class CompactTask implements Callable<CompactResult> { + + private final int outputLevel; + + private final List<List<SortedRun>> partitioned; + + private final boolean dropDelete; + + private CompactTask(CompactUnit unit, boolean dropDelete) { + this.outputLevel = unit.outputLevel(); + this.partitioned = new IntervalPartition(unit.files(), keyComparator).partition(); + this.dropDelete = dropDelete; + } + + @Override + public CompactResult call() throws Exception { + return compact(); + } + + private CompactResult compact() throws Exception { + List<List<SortedRun>> candidate = new ArrayList<>(); + List<SstFileMeta> before = new ArrayList<>(); + List<SstFileMeta> after = new ArrayList<>(); + + // Checking the order and compacting adjacent and contiguous files + // Note: can't skip an intermediate file to compact, this will destroy the overall + // orderliness + for (List<SortedRun> section : partitioned) { + if (section.size() > 1) { + candidate.add(section); + } else { + SortedRun run = section.get(0); + // No overlapping: + // We can just upgrade the large file and just change the level instead of + // rewriting it + // But for small files, we will try to compact it + for (SstFileMeta file : run.files()) { + if (file.fileSize() < minFileSize) { + // Smaller files are rewritten along with the previous files + candidate.add(singletonList(SortedRun.fromSingle(file))); + } else { + // Large file appear, rewrite previous and upgrade it + rewrite(candidate, before, after); + upgrade(file, before, after); + } + } + } + } + rewrite(candidate, before, after); + return result(before, after); + } + + private void upgrade(SstFileMeta file, List<SstFileMeta> before, List<SstFileMeta> after) { + if (file.level() != outputLevel) { + before.add(file); + after.add(file.upgrade(outputLevel)); + } + } + + private void rewrite( + List<List<SortedRun>> candidate, List<SstFileMeta> before, List<SstFileMeta> after) + throws Exception { + if (candidate.isEmpty()) { + return; + } + if (candidate.size() == 1) { + List<SortedRun> section = candidate.get(0); + if (section.size() == 0) { + return; + } else if (section.size() == 1) { + for (SstFileMeta file : section.get(0).files()) { + upgrade(file, before, after); + } + candidate.clear(); + return; + } + } + candidate.forEach(runs -> runs.forEach(run -> before.addAll(run.files()))); + after.addAll(rewriter.rewrite(outputLevel, dropDelete, candidate)); + candidate.clear(); + } + + private CompactResult result(List<SstFileMeta> before, List<SstFileMeta> after) { + return new CompactResult() { + @Override + public List<SstFileMeta> before() { + return before; + } + + @Override + public List<SstFileMeta> after() { + return after; + } + }; + } + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java new file mode 100644 index 0000000..560e298 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.data.writer.BinaryRowWriter; +import org.apache.flink.table.store.file.mergetree.LevelSortedRun; +import org.apache.flink.table.store.file.mergetree.Levels; +import org.apache.flink.table.store.file.mergetree.SortedRun; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link CompactManager}. */ +public class CompactManagerTest { + + private final Comparator<RowData> comparator = Comparator.comparingInt(o -> o.getInt(0)); + + private static ExecutorService service; + + @BeforeAll + public static void before() { + service = Executors.newSingleThreadExecutor(); + } + + @AfterAll + public static void after() { + service.shutdownNow(); + service = null; + } + + @Test + public void testCompactToPenultimateLayer() throws ExecutionException, InterruptedException { + innerTest( + Arrays.asList( + new LevelMinMax(0, 1, 3), + new LevelMinMax(0, 1, 5), + new LevelMinMax(2, 1, 7)), + Arrays.asList(new LevelMinMax(1, 1, 5), new LevelMinMax(2, 1, 7)), + new CompactStrategy() { + @Override + public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) { + return Optional.of(CompactUnit.fromLevelRuns(1, runs.subList(0, 2))); + } + }, + false); + } + + @Test + public void testNoCompaction() throws ExecutionException, InterruptedException { + innerTest( + Collections.singletonList(new LevelMinMax(0, 1, 3)), + Collections.singletonList(new LevelMinMax(0, 1, 3))); + } + + @Test + public void testNormal() throws ExecutionException, InterruptedException { + innerTest( + Arrays.asList( + new LevelMinMax(0, 1, 3), + new LevelMinMax(1, 1, 5), + new LevelMinMax(1, 6, 7)), + Arrays.asList(new LevelMinMax(2, 1, 5), new LevelMinMax(2, 6, 7))); + } + + @Test + public void testUpgrade() throws ExecutionException, InterruptedException { + innerTest( + Arrays.asList( + new LevelMinMax(0, 1, 3), + new LevelMinMax(0, 1, 5), + new LevelMinMax(0, 6, 8)), + Arrays.asList(new LevelMinMax(2, 1, 5), new LevelMinMax(2, 6, 8))); + } + + @Test + public void testSmallFiles() throws ExecutionException, InterruptedException { + innerTest( + Arrays.asList(new LevelMinMax(0, 1, 1), new LevelMinMax(0, 2, 2)), + Collections.singletonList(new LevelMinMax(2, 1, 2))); + } + + @Test + public void testSmallFilesNoCompact() throws ExecutionException, InterruptedException { + innerTest( + Arrays.asList( + new LevelMinMax(0, 1, 5), + new LevelMinMax(0, 6, 6), + new LevelMinMax(1, 7, 8), + new LevelMinMax(1, 9, 10)), + Arrays.asList( + new LevelMinMax(2, 1, 5), + new LevelMinMax(2, 6, 6), + new LevelMinMax(2, 7, 8), + new LevelMinMax(2, 9, 10))); + } + + @Test + public void testSmallFilesCrossLevel() throws ExecutionException, InterruptedException { + innerTest( + Arrays.asList( + new LevelMinMax(0, 1, 5), + new LevelMinMax(0, 6, 6), + new LevelMinMax(1, 7, 7), + new LevelMinMax(1, 9, 10)), + Arrays.asList( + new LevelMinMax(2, 1, 5), + new LevelMinMax(2, 6, 7), + new LevelMinMax(2, 9, 10))); + } + + @Test + public void testComplex() throws ExecutionException, InterruptedException { + innerTest( + Arrays.asList( + new LevelMinMax(0, 1, 5), + new LevelMinMax(0, 6, 6), + new LevelMinMax(1, 1, 4), + new LevelMinMax(1, 6, 8), + new LevelMinMax(1, 10, 11), + new LevelMinMax(2, 1, 3), + new LevelMinMax(2, 4, 6)), + Arrays.asList(new LevelMinMax(2, 1, 8), new LevelMinMax(2, 10, 11))); + } + + @Test + public void testSmallInComplex() throws ExecutionException, InterruptedException { + innerTest( + Arrays.asList( + new LevelMinMax(0, 1, 5), + new LevelMinMax(0, 6, 6), + new LevelMinMax(1, 1, 4), + new LevelMinMax(1, 6, 8), + new LevelMinMax(1, 10, 10), + new LevelMinMax(2, 1, 3), + new LevelMinMax(2, 4, 6)), + Collections.singletonList(new LevelMinMax(2, 1, 10))); + } + + private void innerTest(List<LevelMinMax> inputs, List<LevelMinMax> expected) + throws ExecutionException, InterruptedException { + innerTest(inputs, expected, testStrategy(), true); + } + + private void innerTest( + List<LevelMinMax> inputs, + List<LevelMinMax> expected, + CompactStrategy strategy, + boolean expectedDropDelete) + throws ExecutionException, InterruptedException { + List<SstFileMeta> files = new ArrayList<>(); + for (int i = 0; i < inputs.size(); i++) { + LevelMinMax minMax = inputs.get(i); + files.add(minMax.toFile(i)); + } + Levels levels = new Levels(comparator, files, 3); + CompactManager manager = + new CompactManager( + service, strategy, comparator, 2, testRewriter(expectedDropDelete)); + manager.submitCompaction(levels); + manager.finishCompaction(levels); + List<LevelMinMax> outputs = + levels.allFiles().stream().map(LevelMinMax::new).collect(Collectors.toList()); + assertThat(outputs).isEqualTo(expected); + } + + private static SstFileMeta newFile(int level, int minKey, int maxKey, long maxSequence) { + return new SstFileMeta( + "", maxKey - minKey + 1, 1, row(minKey), row(maxKey), null, 0, maxSequence, level); + } + + public static BinaryRowData row(int i) { + BinaryRowData row = new BinaryRowData(1); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.writeInt(0, i); + writer.complete(); + return row; + } + + private CompactStrategy testStrategy() { + return (numLevels, runs) -> Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs)); + } + + private CompactManager.Rewriter testRewriter(boolean expectedDropDelete) { + return (outputLevel, dropDelete, sections) -> { + assertThat(dropDelete).isEqualTo(expectedDropDelete); + int minKey = Integer.MAX_VALUE; + int maxKey = Integer.MIN_VALUE; + long maxSequence = 0; + for (List<SortedRun> section : sections) { + for (SortedRun run : section) { + for (SstFileMeta file : run.files()) { + int min = file.minKey().getInt(0); + int max = file.maxKey().getInt(0); + if (min < minKey) { + minKey = min; + } + if (max > maxKey) { + maxKey = max; + } + if (file.maxSequenceNumber() > maxSequence) { + maxSequence = file.maxSequenceNumber(); + } + } + } + } + return Collections.singletonList(newFile(outputLevel, minKey, maxKey, maxSequence)); + }; + } + + private static class LevelMinMax { + + private final int level; + private final int min; + private final int max; + + private LevelMinMax(SstFileMeta file) { + this.level = file.level(); + this.min = file.minKey().getInt(0); + this.max = file.maxKey().getInt(0); + } + + private LevelMinMax(int level, int min, int max) { + this.level = level; + this.min = min; + this.max = max; + } + + private SstFileMeta toFile(long maxSequence) { + return newFile(level, min, max, maxSequence); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LevelMinMax that = (LevelMinMax) o; + return level == that.level && min == that.min && max == that.max; + } + + @Override + public int hashCode() { + return Objects.hash(level, min, max); + } + + @Override + public String toString() { + return "LevelMinMax{" + "level=" + level + ", min=" + min + ", max=" + max + '}'; + } + } +}
