This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new c80f600c [FLINK-27696] Add bin-pack strategy to split the whole bucket 
data files into several small splits
c80f600c is described below

commit c80f600c149fc90f8522f43af67f0ab2a713b57f
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 21 13:46:24 2022 +0800

    [FLINK-27696] Add bin-pack strategy to split the whole bucket data files 
into several small splits
    
    This closes #165
---
 .../source/FileStoreSourceSplitGeneratorTest.java  |  14 +--
 .../flink/table/store/file/FileStoreOptions.java   |  27 ++++++
 .../flink/table/store/file/KeyValueFileStore.java  |   6 +-
 .../table/store/file/data/DataFilePathFactory.java |   6 +-
 .../store/file/utils/FileStorePathFactory.java     |   4 +
 .../store/table/AppendOnlyFileStoreTable.java      |   9 ++
 .../table/ChangelogValueCountFileStoreTable.java   |  11 +++
 .../table/ChangelogWithKeyFileStoreTable.java      |  11 +++
 ...enerator.java => AppendOnlySplitGenerator.java} |  25 +++--
 .../store/table/source/DefaultSplitGenerator.java  |  60 ------------
 .../table/source/MergeTreeSplitGenerator.java      | 106 +++++++++++++++++++++
 .../table/store/table/source/SplitGenerator.java   |   9 +-
 .../flink/table/store/table/source/TableScan.java  |  36 ++++++-
 .../table/store/file/data/DataFileTestUtils.java   |  58 +++++++++++
 .../file/mergetree/compact/CompactManagerTest.java |  24 +----
 .../store/table/source/SplitGeneratorTest.java     |  96 +++++++++++++++++++
 16 files changed, 394 insertions(+), 108 deletions(-)

diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 6af47d5d..76331ef4 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.table.source.DefaultSplitGenerator;
+import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableScan;
 
 import org.junit.jupiter.api.Test;
@@ -77,12 +77,12 @@ public class FileStoreSourceSplitGeneratorTest {
                                 makeEntry(6, 1, "f14"));
                     }
                 };
-        TableScan.Plan tableScanPlan =
-                new TableScan.Plan(
-                        1L,
-                        new DefaultSplitGenerator(
-                                        new FileStorePathFactory(new 
Path(tempDir.toString())))
-                                .generate(plan.groupByPartFiles()));
+        List<Split> scanSplits =
+                TableScan.generateSplits(
+                        new FileStorePathFactory(new 
Path(tempDir.toUri().toString())),
+                        Collections::singletonList,
+                        plan.groupByPartFiles());
+        TableScan.Plan tableScanPlan = new TableScan.Plan(1L, scanSplits);
 
         List<FileStoreSourceSplit> splits =
                 new 
FileStoreSourceSplitGenerator().createSplits(tableScanPlan);
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 569ecc4b..0d01be5d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -141,6 +141,20 @@ public class FileStoreOptions implements Serializable {
                                     
.list(formatEnumOption(WriteMode.CHANGE_LOG))
                                     .build());
 
+    public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
+            ConfigOptions.key("source.split.target-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(128))
+                    .withDescription("Target size of a source split when 
scanning a bucket.");
+
+    public static final ConfigOption<MemorySize> SOURCE_SPLIT_OPEN_FILE_COST =
+            ConfigOptions.key("source.split.open-file-cost")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(4))
+                    .withDescription(
+                            "Open file cost of a source file. It is used to 
avoid reading"
+                                    + " too many files with a source split, 
which can be very slow.");
+
     private final Configuration options;
 
     public static Set<ConfigOption<?>> allOptions() {
@@ -155,6 +169,11 @@ public class FileStoreOptions implements Serializable {
         allOptions.add(SNAPSHOT_NUM_RETAINED_MIN);
         allOptions.add(SNAPSHOT_NUM_RETAINED_MAX);
         allOptions.add(SNAPSHOT_TIME_RETAINED);
+        allOptions.add(CONTINUOUS_DISCOVERY_INTERVAL);
+        allOptions.add(MERGE_ENGINE);
+        allOptions.add(WRITE_MODE);
+        allOptions.add(SOURCE_SPLIT_TARGET_SIZE);
+        allOptions.add(SOURCE_SPLIT_OPEN_FILE_COST);
         return allOptions;
     }
 
@@ -237,6 +256,14 @@ public class FileStoreOptions implements Serializable {
         return options.get(MANIFEST_MERGE_MIN_COUNT);
     }
 
+    public long splitTargetSize() {
+        return options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes();
+    }
+
+    public long splitOpenFileCost() {
+        return options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes();
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index ec6a6335..6d2dc3a8 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -66,7 +66,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 schemaId,
                 keyType,
                 valueType,
-                keyComparatorSupplier.get(),
+                newKeyComparator(),
                 mergeFunction,
                 options.fileFormat(),
                 pathFactory());
@@ -98,4 +98,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 options.bucket(),
                 checkNumOfBuckets);
     }
+
+    public Comparator<RowData> newKeyComparator() {
+        return keyComparatorSupplier.get();
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFilePathFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFilePathFactory.java
index d27a3ff8..ce573e61 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFilePathFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFilePathFactory.java
@@ -37,7 +37,7 @@ public class DataFilePathFactory {
     private final String formatIdentifier;
 
     public DataFilePathFactory(Path root, String partition, int bucket, String 
formatIdentifier) {
-        this.bucketDir = new Path(root + "/" + partition + "/bucket-" + 
bucket);
+        this.bucketDir = bucketPath(root, partition, bucket);
         this.uuid = UUID.randomUUID().toString();
 
         this.pathCount = new AtomicInteger(0);
@@ -68,4 +68,8 @@ public class DataFilePathFactory {
     public String uuid() {
         return uuid;
     }
+
+    public static Path bucketPath(Path tablePath, String partition, int 
bucket) {
+        return new Path(tablePath + "/" + partition + "/bucket-" + bucket);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index a992a74e..d4db70b5 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -111,6 +111,10 @@ public class FileStorePathFactory {
                 root, getPartitionString(partition), bucket, formatIdentifier);
     }
 
+    public Path bucketPath(BinaryRowData partition, int bucket) {
+        return DataFilePathFactory.bucketPath(root, 
getPartitionString(partition), bucket);
+    }
+
     /** IMPORTANT: This method is NOT THREAD SAFE. */
     public String getPartitionString(BinaryRowData partition) {
         return PartitionPathUtils.generatePartitionPath(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 7f0de998..a30f1f61 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -29,12 +29,15 @@ import 
org.apache.flink.table.store.file.operation.AppendOnlyFileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.table.sink.AbstractTableWrite;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.AppendOnlySplitGenerator;
+import org.apache.flink.table.store.table.source.SplitGenerator;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
 import org.apache.flink.types.RowKind;
@@ -66,6 +69,12 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     public TableScan newScan() {
         AppendOnlyFileStoreScan scan = store.newScan();
         return new TableScan(scan, schema, store.pathFactory()) {
+            @Override
+            protected SplitGenerator splitGenerator(FileStorePathFactory 
pathFactory) {
+                return new AppendOnlySplitGenerator(
+                        store.options().splitTargetSize(), 
store.options().splitOpenFileCost());
+            }
+
             @Override
             protected void withNonPartitionFilter(Predicate predicate) {
                 scan.withFilter(predicate);
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index c9f85f80..b41b5b78 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.table.sink.AbstractTableWrite;
@@ -38,6 +39,8 @@ import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.KeyValueTableRead;
+import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
+import org.apache.flink.table.store.table.source.SplitGenerator;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
 import 
org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
@@ -75,6 +78,14 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
     public TableScan newScan() {
         KeyValueFileStoreScan scan = store.newScan();
         return new TableScan(scan, schema, store.pathFactory()) {
+            @Override
+            protected SplitGenerator splitGenerator(FileStorePathFactory 
pathFactory) {
+                return new MergeTreeSplitGenerator(
+                        store.newKeyComparator(),
+                        store.options().splitTargetSize(),
+                        store.options().splitOpenFileCost());
+            }
+
             @Override
             protected void withNonPartitionFilter(Predicate predicate) {
                 scan.withKeyFilter(predicate);
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 7109a91a..8421c811 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.table.sink.AbstractTableWrite;
@@ -40,6 +41,8 @@ import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.KeyValueTableRead;
+import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
+import org.apache.flink.table.store.table.source.SplitGenerator;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
 import 
org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
@@ -111,6 +114,14 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
     public TableScan newScan() {
         KeyValueFileStoreScan scan = store.newScan();
         return new TableScan(scan, schema, store.pathFactory()) {
+            @Override
+            protected SplitGenerator splitGenerator(FileStorePathFactory 
pathFactory) {
+                return new MergeTreeSplitGenerator(
+                        store.newKeyComparator(),
+                        store.options().splitTargetSize(),
+                        store.options().splitOpenFileCost());
+            }
+
             @Override
             protected void withNonPartitionFilter(Predicate predicate) {
                 // currently we can only perform filter push down on keys
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AppendOnlySplitGenerator.java
similarity index 56%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
copy to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AppendOnlySplitGenerator.java
index fb0ce382..cca5cca2 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AppendOnlySplitGenerator.java
@@ -18,17 +18,26 @@
 
 package org.apache.flink.table.store.table.source;
 
-import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.connector.file.table.BinPacking;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 
 import java.util.List;
-import java.util.Map;
+import java.util.function.Function;
 
-/**
- * Generate {@link Split}s from a map with partition and bucket as keys and 
{@link DataFileMeta}s as
- * values.
- */
-public interface SplitGenerator {
+/** Append only implementation of {@link SplitGenerator}. */
+public class AppendOnlySplitGenerator implements SplitGenerator {
+
+    private final long targetSplitSize;
+    private final long openFileCost;
+
+    public AppendOnlySplitGenerator(long targetSplitSize, long openFileCost) {
+        this.targetSplitSize = targetSplitSize;
+        this.openFileCost = openFileCost;
+    }
 
-    List<Split> generate(Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
groupedDataFileMetas);
+    @Override
+    public List<List<DataFileMeta>> split(List<DataFileMeta> files) {
+        Function<DataFileMeta, Long> weightFunc = file -> 
Math.max(file.fileSize(), openFileCost);
+        return BinPacking.pack(files, weightFunc, targetSplitSize);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DefaultSplitGenerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DefaultSplitGenerator.java
deleted file mode 100644
index 01107bb8..00000000
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DefaultSplitGenerator.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.table.source;
-
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/** Default implementation of {@link SplitGenerator}. The whole bucket is 
marked as one split. */
-public class DefaultSplitGenerator implements SplitGenerator {
-
-    private final FileStorePathFactory pathFactory;
-
-    public DefaultSplitGenerator(FileStorePathFactory pathFactory) {
-        this.pathFactory = pathFactory;
-    }
-
-    @Override
-    public List<Split> generate(
-            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
groupedDataFileMetas) {
-        List<Split> splits = new ArrayList<>();
-        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
entryWithPartition :
-                groupedDataFileMetas.entrySet()) {
-            BinaryRowData partition = entryWithPartition.getKey();
-            for (Map.Entry<Integer, List<DataFileMeta>> entryWithBucket :
-                    entryWithPartition.getValue().entrySet()) {
-                int bucket = entryWithBucket.getKey();
-                splits.add(
-                        new Split(
-                                partition,
-                                bucket,
-                                entryWithBucket.getValue(),
-                                pathFactory
-                                        .createDataFilePathFactory(partition, 
bucket)
-                                        .bucketPath()));
-            }
-        }
-        return splits;
-    }
-}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/MergeTreeSplitGenerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/MergeTreeSplitGenerator.java
new file mode 100644
index 00000000..9a70266a
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/MergeTreeSplitGenerator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.connector.file.table.BinPacking;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Merge tree implementation of {@link SplitGenerator}. */
+public class MergeTreeSplitGenerator implements SplitGenerator {
+
+    private final Comparator<RowData> keyComparator;
+
+    private final long targetSplitSize;
+
+    private final long openFileCost;
+
+    public MergeTreeSplitGenerator(
+            Comparator<RowData> keyComparator, long targetSplitSize, long 
openFileCost) {
+        this.keyComparator = keyComparator;
+        this.targetSplitSize = targetSplitSize;
+        this.openFileCost = openFileCost;
+    }
+
+    @Override
+    public List<List<DataFileMeta>> split(List<DataFileMeta> files) {
+        /*
+         * The generator aims to parallel the scan execution by slicing the 
files of each bucket
+         * into multiple splits. The generation has one constraint: files with 
intersected key
+         * ranges (within one section) must go to the same split. Therefore, 
the files are first to go
+         * through the interval partition algorithm to generate sections and 
then through the
+         * BinPack algorithm. Note that the item to be packed here is each 
section, the bin capacity
+         * is denoted as the targetSplitSize, and the final number of the bins 
is the number of
+         * splits generated.
+         *
+         * For instance, there are files: [1, 2] [3, 4] [5, 180] [5, 190] 
[200, 600] [210, 700]
+         * with targetSplitSize 128M. After interval partition, there are four 
sections:
+         * - section1: [1, 2]
+         * - section2: [3, 4]
+         * - section3: [5, 180], [5, 190]
+         * - section4: [200, 600], [210, 700]
+         *
+         * After BinPack, section1 and section2 will be put into one bin 
(split), so the final result will be:
+         * - split1: [1, 2] [3, 4]
+         * - split2: [5, 180] [5,190]
+         * - split3: [200, 600] [210, 700]
+         */
+        List<List<DataFileMeta>> sections =
+                new IntervalPartition(files, keyComparator)
+                        
.partition().stream().map(this::flatRun).collect(Collectors.toList());
+
+        return binPackSplits(sections);
+    }
+
+    private List<List<DataFileMeta>> binPackSplits(List<List<DataFileMeta>> 
sections) {
+        Function<List<DataFileMeta>, Long> weightFunc =
+                file -> Math.max(totalSize(file), openFileCost);
+        return BinPacking.pack(sections, weightFunc, targetSplitSize).stream()
+                .map(this::flatFiles)
+                .collect(Collectors.toList());
+    }
+
+    private long totalSize(List<DataFileMeta> section) {
+        long size = 0L;
+        for (DataFileMeta file : section) {
+            size += file.fileSize();
+        }
+        return size;
+    }
+
+    private List<DataFileMeta> flatRun(List<SortedRun> section) {
+        List<DataFileMeta> files = new ArrayList<>();
+        section.forEach(run -> files.addAll(run.files()));
+        return files;
+    }
+
+    private List<DataFileMeta> flatFiles(List<List<DataFileMeta>> section) {
+        List<DataFileMeta> files = new ArrayList<>();
+        section.forEach(files::addAll);
+        return files;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
index fb0ce382..4680ad3a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
@@ -18,17 +18,12 @@
 
 package org.apache.flink.table.store.table.source;
 
-import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 
 import java.util.List;
-import java.util.Map;
 
-/**
- * Generate {@link Split}s from a map with partition and bucket as keys and 
{@link DataFileMeta}s as
- * values.
- */
+/** Generate splits from {@link DataFileMeta}s. */
 public interface SplitGenerator {
 
-    List<Split> generate(Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
groupedDataFileMetas);
+    List<List<DataFileMeta>> split(List<DataFileMeta> files);
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index 1769e6e0..4a0e52f6 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -19,6 +19,9 @@
 package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.predicate.CompoundPredicate;
 import org.apache.flink.table.store.file.predicate.LeafPredicate;
@@ -31,6 +34,7 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /** An abstraction layer above {@link FileStoreScan} to provide input split 
generation. */
@@ -90,11 +94,37 @@ public abstract class TableScan {
 
     public Plan plan() {
         FileStoreScan.Plan plan = scan.plan();
-        return new Plan(
-                plan.snapshotId(),
-                new 
DefaultSplitGenerator(pathFactory).generate(plan.groupByPartFiles()));
+        return new Plan(plan.snapshotId(), 
generateSplits(plan.groupByPartFiles()));
     }
 
+    private List<Split> generateSplits(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
groupedDataFiles) {
+        return generateSplits(pathFactory, splitGenerator(pathFactory), 
groupedDataFiles);
+    }
+
+    @VisibleForTesting
+    public static List<Split> generateSplits(
+            FileStorePathFactory pathFactory,
+            SplitGenerator splitGenerator,
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
groupedDataFiles) {
+        List<Split> splits = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entry :
+                groupedDataFiles.entrySet()) {
+            BinaryRowData partition = entry.getKey();
+            Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry : 
buckets.entrySet()) {
+                int bucket = bucketEntry.getKey();
+                Path bucketPath = pathFactory.bucketPath(partition, bucket);
+                splitGenerator.split(bucketEntry.getValue()).stream()
+                        .map(files -> new Split(partition, bucket, files, 
bucketPath))
+                        .forEach(splits::add);
+            }
+        }
+        return splits;
+    }
+
+    protected abstract SplitGenerator splitGenerator(FileStorePathFactory 
pathFactory);
+
     protected abstract void withNonPartitionFilter(Predicate predicate);
 
     protected Optional<Predicate> mapFilterFields(Predicate predicate, int[] 
fieldIdxMapping) {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
new file mode 100644
index 00000000..d515a891
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+
+/** Utils for {@link DataFileMeta}. */
+public class DataFileTestUtils {
+
+    public static DataFileMeta fromMinMax(String name, int minKey, int maxKey) 
{
+        return newFile(name, 0, minKey, maxKey, 0);
+    }
+
+    public static DataFileMeta newFile(int level, int minKey, int maxKey, long 
maxSequence) {
+        return newFile("", level, minKey, maxKey, maxSequence);
+    }
+
+    public static DataFileMeta newFile(
+            String name, int level, int minKey, int maxKey, long maxSequence) {
+        return new DataFileMeta(
+                name,
+                maxKey - minKey + 1,
+                1,
+                row(minKey),
+                row(maxKey),
+                null,
+                null,
+                0,
+                maxSequence,
+                0,
+                level);
+    }
+
+    public static BinaryRowData row(int i) {
+        BinaryRowData row = new BinaryRowData(1);
+        BinaryRowWriter writer = new BinaryRowWriter(row);
+        writer.writeInt(0, i);
+        writer.complete();
+        return row;
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
index 51a56305..de636f63 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.store.file.mergetree.compact;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.data.writer.BinaryRowWriter;
 import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFileTestUtils;
 import org.apache.flink.table.store.file.mergetree.Levels;
 import org.apache.flink.table.store.file.mergetree.SortedRun;
 
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.store.file.data.DataFileTestUtils.newFile;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link CompactManager}. */
@@ -203,27 +204,8 @@ public class CompactManagerTest {
         assertThat(outputs).isEqualTo(expected);
     }
 
-    private static DataFileMeta newFile(int level, int minKey, int maxKey, 
long maxSequence) {
-        return new DataFileMeta(
-                "",
-                maxKey - minKey + 1,
-                1,
-                row(minKey),
-                row(maxKey),
-                null,
-                null,
-                0,
-                maxSequence,
-                0,
-                level);
-    }
-
     public static BinaryRowData row(int i) {
-        BinaryRowData row = new BinaryRowData(1);
-        BinaryRowWriter writer = new BinaryRowWriter(row);
-        writer.writeInt(0, i);
-        writer.complete();
-        return row;
+        return DataFileTestUtils.row(i);
     }
 
     private CompactStrategy testStrategy() {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitGeneratorTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitGeneratorTest.java
new file mode 100644
index 00000000..9a8feb0b
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitGeneratorTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.store.file.data.DataFileTestUtils.fromMinMax;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link AppendOnlySplitGenerator} and {@link 
MergeTreeSplitGenerator}. */
+public class SplitGeneratorTest {
+
+    private final List<DataFileMeta> files =
+            Arrays.asList(
+                    fromMinMax("1", 0, 10),
+                    fromMinMax("2", 0, 12),
+                    fromMinMax("3", 15, 60),
+                    fromMinMax("4", 18, 40),
+                    fromMinMax("5", 82, 85),
+                    fromMinMax("6", 100, 200));
+
+    @Test
+    public void testAppend() {
+        assertThat(toNames(new AppendOnlySplitGenerator(40, 2).split(files)))
+                .containsExactlyInAnyOrder(
+                        Arrays.asList("1", "2", "5"),
+                        Collections.singletonList("3"),
+                        Collections.singletonList("4"),
+                        Collections.singletonList("6"));
+
+        assertThat(toNames(new AppendOnlySplitGenerator(50, 2).split(files)))
+                .containsExactlyInAnyOrder(
+                        Arrays.asList("1", "2", "4"),
+                        Arrays.asList("3", "5"),
+                        Collections.singletonList("6"));
+
+        assertThat(toNames(new AppendOnlySplitGenerator(50, 10).split(files)))
+                .containsExactlyInAnyOrder(
+                        Arrays.asList("1", "2", "4"),
+                        Collections.singletonList("3"),
+                        Collections.singletonList("5"),
+                        Collections.singletonList("6"));
+    }
+
+    @Test
+    public void testMergeTree() {
+        Comparator<RowData> comparator = Comparator.comparingInt(o -> 
o.getInt(0));
+        assertThat(toNames(new MergeTreeSplitGenerator(comparator, 40, 
2).split(files)))
+                .containsExactlyInAnyOrder(
+                        Arrays.asList("1", "2", "5"),
+                        Arrays.asList("4", "3"),
+                        Collections.singletonList("6"));
+
+        assertThat(toNames(new MergeTreeSplitGenerator(comparator, 40, 
20).split(files)))
+                .containsExactlyInAnyOrder(
+                        Arrays.asList("1", "2"),
+                        Arrays.asList("4", "3"),
+                        Collections.singletonList("5"),
+                        Collections.singletonList("6"));
+    }
+
+    private List<List<String>> toNames(List<List<DataFileMeta>> splits) {
+        return splits.stream()
+                .map(
+                        files ->
+                                files.stream()
+                                        .map(DataFileMeta::fileName)
+                                        .collect(Collectors.toList()))
+                .collect(Collectors.toList());
+    }
+}

Reply via email to