This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 166b066744858fb63c83f384414db01408e49141 Author: JingsongLi <[email protected]> AuthorDate: Wed Jan 26 18:26:55 2022 +0800 [FLINK-25820] Introduce FileStoreSourceSplitGenerator --- .../source/FileStoreSourceSplitGenerator.java | 79 ++++++++++++++ .../source/FileStoreSourceSplitGeneratorTest.java | 119 +++++++++++++++++++++ .../table/store/file/operation/FileStoreScan.java | 20 ++++ 3 files changed, 218 insertions(+) diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java new file mode 100644 index 0000000..217119d --- /dev/null +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.table.store.file.operation.FileStoreScan; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * The {@code FileStoreSplitGenerator}'s task is to plan all files to be read and to split them into + * a set of {@link FileStoreSourceSplit}. + */ +public class FileStoreSourceSplitGenerator { + + /** + * The current Id as a mutable string representation. This covers more values than the integer + * value range, so we should never overflow. + */ + private final char[] currentId = "0000000000".toCharArray(); + + public List<FileStoreSourceSplit> createSplits(FileStoreScan scan) { + return createSplits(scan.plan()); + } + + public List<FileStoreSourceSplit> createSplits(FileStoreScan.Plan plan) { + return plan.groupByPartFiles().entrySet().stream() + .flatMap( + pe -> + pe.getValue().entrySet().stream() + .map( + be -> + new FileStoreSourceSplit( + getNextId(), + pe.getKey(), + be.getKey(), + be.getValue()))) + .collect(Collectors.toList()); + } + + protected final String getNextId() { + // because we just increment numbers, we increment the char representation directly, + // rather than incrementing an integer and converting it to a string representation + // every time again (requires quite some expensive conversion logic). + incrementCharArrayByOne(currentId, currentId.length - 1); + return new String(currentId); + } + + private static void incrementCharArrayByOne(char[] array, int pos) { + if (pos < 0) { + throw new RuntimeException("Produce too many splits."); + } + + char c = array[pos]; + c++; + + if (c > '9') { + c = '0'; + incrementCharArrayByOne(array, pos - 1); + } + array[pos] = c; + } +} diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java new file mode 100644 index 0000000..355ddea --- /dev/null +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.table.store.file.ValueKind; +import org.apache.flink.table.store.file.manifest.ManifestEntry; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; +import org.apache.flink.table.store.file.operation.FileStoreScan; +import org.apache.flink.table.store.file.stats.FieldStats; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link FileStoreSourceSplitGenerator}. */ +public class FileStoreSourceSplitGeneratorTest { + + @Test + public void test() { + FileStoreScan.Plan plan = + new FileStoreScan.Plan() { + @Nullable + @Override + public Long snapshotId() { + return null; + } + + @Override + public List<ManifestEntry> files() { + return Arrays.asList( + makeEntry(1, 0, "f0"), + makeEntry(1, 0, "f1"), + makeEntry(1, 1, "f2"), + makeEntry(2, 0, "f3"), + makeEntry(2, 0, "f4"), + makeEntry(2, 0, "f5"), + makeEntry(2, 1, "f6"), + makeEntry(3, 0, "f7"), + makeEntry(3, 1, "f8"), + makeEntry(4, 0, "f9"), + makeEntry(4, 1, "f10"), + makeEntry(5, 0, "f11"), + makeEntry(5, 1, "f12"), + makeEntry(6, 0, "f13"), + makeEntry(6, 1, "f14")); + } + }; + List<FileStoreSourceSplit> splits = new FileStoreSourceSplitGenerator().createSplits(plan); + assertThat(splits.size()).isEqualTo(12); + splits.sort( + Comparator.comparingInt(o -> ((FileStoreSourceSplit) o).partition().getInt(0)) + .thenComparing(o -> ((FileStoreSourceSplit) o).bucket())); + assertSplit(splits.get(0), "0000000007", 1, 0, Arrays.asList("f0", "f1")); + assertSplit(splits.get(1), "0000000008", 1, 1, Collections.singletonList("f2")); + assertSplit(splits.get(2), "0000000003", 2, 0, Arrays.asList("f3", "f4", "f5")); + assertSplit(splits.get(3), "0000000004", 2, 1, Collections.singletonList("f6")); + assertSplit(splits.get(4), "0000000001", 3, 0, Collections.singletonList("f7")); + assertSplit(splits.get(5), "0000000002", 3, 1, Collections.singletonList("f8")); + assertSplit(splits.get(6), "0000000011", 4, 0, Collections.singletonList("f9")); + assertSplit(splits.get(7), "0000000012", 4, 1, Collections.singletonList("f10")); + assertSplit(splits.get(8), "0000000005", 5, 0, Collections.singletonList("f11")); + assertSplit(splits.get(9), "0000000006", 5, 1, Collections.singletonList("f12")); + assertSplit(splits.get(10), "0000000009", 6, 0, Collections.singletonList("f13")); + assertSplit(splits.get(11), "0000000010", 6, 1, Collections.singletonList("f14")); + } + + private void assertSplit( + FileStoreSourceSplit split, String splitId, int part, int bucket, List<String> files) { + assertThat(split.splitId()).isEqualTo(splitId); + assertThat(split.partition().getInt(0)).isEqualTo(part); + assertThat(split.bucket()).isEqualTo(bucket); + assertThat(split.files().stream().map(SstFileMeta::fileName).collect(Collectors.toList())) + .isEqualTo(files); + } + + private ManifestEntry makeEntry(int partition, int bucket, String fileName) { + return new ManifestEntry( + ValueKind.ADD, + row(partition), // not used + bucket, // not used + 0, // not used + new SstFileMeta( + fileName, + 0, // not used + 0, // not used + null, // not used + null, // not used + new FieldStats[] {new FieldStats(null, null, 0)}, // not used + 0, // not used + 0, // not used + 0 // not used + )); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java index 2175f09..ab8e6ce 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java @@ -19,13 +19,20 @@ package org.apache.flink.table.store.file.operation; import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.ValueKind; import org.apache.flink.table.store.file.manifest.ManifestEntry; import org.apache.flink.table.store.file.manifest.ManifestFileMeta; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; import org.apache.flink.table.store.file.predicate.Predicate; import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; /** Scan operation which produces a plan. */ public interface FileStoreScan { @@ -56,5 +63,18 @@ public interface FileStoreScan { /** Result {@link ManifestEntry} files. */ List<ManifestEntry> files(); + + /** Return a map group by partition and bucket. */ + default Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> groupByPartFiles() { + List<ManifestEntry> files = files(); + Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> groupBy = new HashMap<>(); + for (ManifestEntry entry : files) { + checkArgument(entry.kind() == ValueKind.ADD); + groupBy.computeIfAbsent(entry.partition(), k -> new HashMap<>()) + .computeIfAbsent(entry.bucket(), k -> new ArrayList<>()) + .add(entry.file()); + } + return groupBy; + } } }
