This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 166b066744858fb63c83f384414db01408e49141
Author: JingsongLi <[email protected]>
AuthorDate: Wed Jan 26 18:26:55 2022 +0800

    [FLINK-25820] Introduce FileStoreSourceSplitGenerator
---
 .../source/FileStoreSourceSplitGenerator.java      |  79 ++++++++++++++
 .../source/FileStoreSourceSplitGeneratorTest.java  | 119 +++++++++++++++++++++
 .../table/store/file/operation/FileStoreScan.java  |  20 ++++
 3 files changed, 218 insertions(+)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
new file mode 100644
index 0000000..217119d
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * The {@code FileStoreSplitGenerator}'s task is to plan all files to be read 
and to split them into
+ * a set of {@link FileStoreSourceSplit}.
+ */
+public class FileStoreSourceSplitGenerator {
+
+    /**
+     * The current Id as a mutable string representation. This covers more 
values than the integer
+     * value range, so we should never overflow.
+     */
+    private final char[] currentId = "0000000000".toCharArray();
+
+    public List<FileStoreSourceSplit> createSplits(FileStoreScan scan) {
+        return createSplits(scan.plan());
+    }
+
+    public List<FileStoreSourceSplit> createSplits(FileStoreScan.Plan plan) {
+        return plan.groupByPartFiles().entrySet().stream()
+                .flatMap(
+                        pe ->
+                                pe.getValue().entrySet().stream()
+                                        .map(
+                                                be ->
+                                                        new 
FileStoreSourceSplit(
+                                                                getNextId(),
+                                                                pe.getKey(),
+                                                                be.getKey(),
+                                                                
be.getValue())))
+                .collect(Collectors.toList());
+    }
+
+    protected final String getNextId() {
+        // because we just increment numbers, we increment the char 
representation directly,
+        // rather than incrementing an integer and converting it to a string 
representation
+        // every time again (requires quite some expensive conversion logic).
+        incrementCharArrayByOne(currentId, currentId.length - 1);
+        return new String(currentId);
+    }
+
+    private static void incrementCharArrayByOne(char[] array, int pos) {
+        if (pos < 0) {
+            throw new RuntimeException("Produce too many splits.");
+        }
+
+        char c = array[pos];
+        c++;
+
+        if (c > '9') {
+            c = '0';
+            incrementCharArrayByOne(array, pos - 1);
+        }
+        array[pos] = c;
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
new file mode 100644
index 0000000..355ddea
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FileStoreSourceSplitGenerator}. */
+public class FileStoreSourceSplitGeneratorTest {
+
+    @Test
+    public void test() {
+        FileStoreScan.Plan plan =
+                new FileStoreScan.Plan() {
+                    @Nullable
+                    @Override
+                    public Long snapshotId() {
+                        return null;
+                    }
+
+                    @Override
+                    public List<ManifestEntry> files() {
+                        return Arrays.asList(
+                                makeEntry(1, 0, "f0"),
+                                makeEntry(1, 0, "f1"),
+                                makeEntry(1, 1, "f2"),
+                                makeEntry(2, 0, "f3"),
+                                makeEntry(2, 0, "f4"),
+                                makeEntry(2, 0, "f5"),
+                                makeEntry(2, 1, "f6"),
+                                makeEntry(3, 0, "f7"),
+                                makeEntry(3, 1, "f8"),
+                                makeEntry(4, 0, "f9"),
+                                makeEntry(4, 1, "f10"),
+                                makeEntry(5, 0, "f11"),
+                                makeEntry(5, 1, "f12"),
+                                makeEntry(6, 0, "f13"),
+                                makeEntry(6, 1, "f14"));
+                    }
+                };
+        List<FileStoreSourceSplit> splits = new 
FileStoreSourceSplitGenerator().createSplits(plan);
+        assertThat(splits.size()).isEqualTo(12);
+        splits.sort(
+                Comparator.comparingInt(o -> ((FileStoreSourceSplit) 
o).partition().getInt(0))
+                        .thenComparing(o -> ((FileStoreSourceSplit) 
o).bucket()));
+        assertSplit(splits.get(0), "0000000007", 1, 0, Arrays.asList("f0", 
"f1"));
+        assertSplit(splits.get(1), "0000000008", 1, 1, 
Collections.singletonList("f2"));
+        assertSplit(splits.get(2), "0000000003", 2, 0, Arrays.asList("f3", 
"f4", "f5"));
+        assertSplit(splits.get(3), "0000000004", 2, 1, 
Collections.singletonList("f6"));
+        assertSplit(splits.get(4), "0000000001", 3, 0, 
Collections.singletonList("f7"));
+        assertSplit(splits.get(5), "0000000002", 3, 1, 
Collections.singletonList("f8"));
+        assertSplit(splits.get(6), "0000000011", 4, 0, 
Collections.singletonList("f9"));
+        assertSplit(splits.get(7), "0000000012", 4, 1, 
Collections.singletonList("f10"));
+        assertSplit(splits.get(8), "0000000005", 5, 0, 
Collections.singletonList("f11"));
+        assertSplit(splits.get(9), "0000000006", 5, 1, 
Collections.singletonList("f12"));
+        assertSplit(splits.get(10), "0000000009", 6, 0, 
Collections.singletonList("f13"));
+        assertSplit(splits.get(11), "0000000010", 6, 1, 
Collections.singletonList("f14"));
+    }
+
+    private void assertSplit(
+            FileStoreSourceSplit split, String splitId, int part, int bucket, 
List<String> files) {
+        assertThat(split.splitId()).isEqualTo(splitId);
+        assertThat(split.partition().getInt(0)).isEqualTo(part);
+        assertThat(split.bucket()).isEqualTo(bucket);
+        
assertThat(split.files().stream().map(SstFileMeta::fileName).collect(Collectors.toList()))
+                .isEqualTo(files);
+    }
+
+    private ManifestEntry makeEntry(int partition, int bucket, String 
fileName) {
+        return new ManifestEntry(
+                ValueKind.ADD,
+                row(partition), // not used
+                bucket, // not used
+                0, // not used
+                new SstFileMeta(
+                        fileName,
+                        0, // not used
+                        0, // not used
+                        null, // not used
+                        null, // not used
+                        new FieldStats[] {new FieldStats(null, null, 0)}, // 
not used
+                        0, // not used
+                        0, // not used
+                        0 // not used
+                        ));
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 2175f09..ab8e6ce 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -19,13 +19,20 @@
 package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 import org.apache.flink.table.store.file.predicate.Predicate;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Scan operation which produces a plan. */
 public interface FileStoreScan {
@@ -56,5 +63,18 @@ public interface FileStoreScan {
 
         /** Result {@link ManifestEntry} files. */
         List<ManifestEntry> files();
+
+        /** Return a map group by partition and bucket. */
+        default Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> 
groupByPartFiles() {
+            List<ManifestEntry> files = files();
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> groupBy = new 
HashMap<>();
+            for (ManifestEntry entry : files) {
+                checkArgument(entry.kind() == ValueKind.ADD);
+                groupBy.computeIfAbsent(entry.partition(), k -> new 
HashMap<>())
+                        .computeIfAbsent(entry.bucket(), k -> new 
ArrayList<>())
+                        .add(entry.file());
+            }
+            return groupBy;
+        }
     }
 }

Reply via email to