This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new c80f600c [FLINK-27696] Add bin-pack strategy to split the whole bucket
data files into several small splits
c80f600c is described below
commit c80f600c149fc90f8522f43af67f0ab2a713b57f
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 21 13:46:24 2022 +0800
[FLINK-27696] Add bin-pack strategy to split the whole bucket data files
into several small splits
This closes #165
---
.../source/FileStoreSourceSplitGeneratorTest.java | 14 +--
.../flink/table/store/file/FileStoreOptions.java | 27 ++++++
.../flink/table/store/file/KeyValueFileStore.java | 6 +-
.../table/store/file/data/DataFilePathFactory.java | 6 +-
.../store/file/utils/FileStorePathFactory.java | 4 +
.../store/table/AppendOnlyFileStoreTable.java | 9 ++
.../table/ChangelogValueCountFileStoreTable.java | 11 +++
.../table/ChangelogWithKeyFileStoreTable.java | 11 +++
...enerator.java => AppendOnlySplitGenerator.java} | 25 +++--
.../store/table/source/DefaultSplitGenerator.java | 60 ------------
.../table/source/MergeTreeSplitGenerator.java | 106 +++++++++++++++++++++
.../table/store/table/source/SplitGenerator.java | 9 +-
.../flink/table/store/table/source/TableScan.java | 36 ++++++-
.../table/store/file/data/DataFileTestUtils.java | 58 +++++++++++
.../file/mergetree/compact/CompactManagerTest.java | 24 +----
.../store/table/source/SplitGeneratorTest.java | 96 +++++++++++++++++++
16 files changed, 394 insertions(+), 108 deletions(-)
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 6af47d5d..76331ef4 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -25,7 +25,7 @@ import
org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.table.source.DefaultSplitGenerator;
+import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableScan;
import org.junit.jupiter.api.Test;
@@ -77,12 +77,12 @@ public class FileStoreSourceSplitGeneratorTest {
makeEntry(6, 1, "f14"));
}
};
- TableScan.Plan tableScanPlan =
- new TableScan.Plan(
- 1L,
- new DefaultSplitGenerator(
- new FileStorePathFactory(new
Path(tempDir.toString())))
- .generate(plan.groupByPartFiles()));
+ List<Split> scanSplits =
+ TableScan.generateSplits(
+ new FileStorePathFactory(new
Path(tempDir.toUri().toString())),
+ Collections::singletonList,
+ plan.groupByPartFiles());
+ TableScan.Plan tableScanPlan = new TableScan.Plan(1L, scanSplits);
List<FileStoreSourceSplit> splits =
new
FileStoreSourceSplitGenerator().createSplits(tableScanPlan);
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 569ecc4b..0d01be5d 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -141,6 +141,20 @@ public class FileStoreOptions implements Serializable {
.list(formatEnumOption(WriteMode.CHANGE_LOG))
.build());
+ public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
+ ConfigOptions.key("source.split.target-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(128))
+ .withDescription("Target size of a source split when
scanning a bucket.");
+
+ public static final ConfigOption<MemorySize> SOURCE_SPLIT_OPEN_FILE_COST =
+ ConfigOptions.key("source.split.open-file-cost")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(4))
+ .withDescription(
+ "Open file cost of a source file. It is used to
avoid reading"
+ + " too many files with a source split,
which can be very slow.");
+
private final Configuration options;
public static Set<ConfigOption<?>> allOptions() {
@@ -155,6 +169,11 @@ public class FileStoreOptions implements Serializable {
allOptions.add(SNAPSHOT_NUM_RETAINED_MIN);
allOptions.add(SNAPSHOT_NUM_RETAINED_MAX);
allOptions.add(SNAPSHOT_TIME_RETAINED);
+ allOptions.add(CONTINUOUS_DISCOVERY_INTERVAL);
+ allOptions.add(MERGE_ENGINE);
+ allOptions.add(WRITE_MODE);
+ allOptions.add(SOURCE_SPLIT_TARGET_SIZE);
+ allOptions.add(SOURCE_SPLIT_OPEN_FILE_COST);
return allOptions;
}
@@ -237,6 +256,14 @@ public class FileStoreOptions implements Serializable {
return options.get(MANIFEST_MERGE_MIN_COUNT);
}
+ public long splitTargetSize() {
+ return options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes();
+ }
+
+ public long splitOpenFileCost() {
+ return options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes();
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index ec6a6335..6d2dc3a8 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -66,7 +66,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
schemaId,
keyType,
valueType,
- keyComparatorSupplier.get(),
+ newKeyComparator(),
mergeFunction,
options.fileFormat(),
pathFactory());
@@ -98,4 +98,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
options.bucket(),
checkNumOfBuckets);
}
+
+ public Comparator<RowData> newKeyComparator() {
+ return keyComparatorSupplier.get();
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFilePathFactory.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFilePathFactory.java
index d27a3ff8..ce573e61 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFilePathFactory.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFilePathFactory.java
@@ -37,7 +37,7 @@ public class DataFilePathFactory {
private final String formatIdentifier;
public DataFilePathFactory(Path root, String partition, int bucket, String
formatIdentifier) {
- this.bucketDir = new Path(root + "/" + partition + "/bucket-" +
bucket);
+ this.bucketDir = bucketPath(root, partition, bucket);
this.uuid = UUID.randomUUID().toString();
this.pathCount = new AtomicInteger(0);
@@ -68,4 +68,8 @@ public class DataFilePathFactory {
public String uuid() {
return uuid;
}
+
+ public static Path bucketPath(Path tablePath, String partition, int
bucket) {
+ return new Path(tablePath + "/" + partition + "/bucket-" + bucket);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index a992a74e..d4db70b5 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -111,6 +111,10 @@ public class FileStorePathFactory {
root, getPartitionString(partition), bucket, formatIdentifier);
}
+ public Path bucketPath(BinaryRowData partition, int bucket) {
+ return DataFilePathFactory.bucketPath(root,
getPartitionString(partition), bucket);
+ }
+
/** IMPORTANT: This method is NOT THREAD SAFE. */
public String getPartitionString(BinaryRowData partition) {
return PartitionPathUtils.generatePartitionPath(
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 7f0de998..a30f1f61 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -29,12 +29,15 @@ import
org.apache.flink.table.store.file.operation.AppendOnlyFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.table.sink.AbstractTableWrite;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.AppendOnlySplitGenerator;
+import org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.types.RowKind;
@@ -66,6 +69,12 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
public TableScan newScan() {
AppendOnlyFileStoreScan scan = store.newScan();
return new TableScan(scan, schema, store.pathFactory()) {
+ @Override
+ protected SplitGenerator splitGenerator(FileStorePathFactory
pathFactory) {
+ return new AppendOnlySplitGenerator(
+ store.options().splitTargetSize(),
store.options().splitOpenFileCost());
+ }
+
@Override
protected void withNonPartitionFilter(Predicate predicate) {
scan.withFilter(predicate);
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index c9f85f80..b41b5b78 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -31,6 +31,7 @@ import
org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.table.sink.AbstractTableWrite;
@@ -38,6 +39,8 @@ import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.KeyValueTableRead;
+import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
+import org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import
org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
@@ -75,6 +78,14 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
public TableScan newScan() {
KeyValueFileStoreScan scan = store.newScan();
return new TableScan(scan, schema, store.pathFactory()) {
+ @Override
+ protected SplitGenerator splitGenerator(FileStorePathFactory
pathFactory) {
+ return new MergeTreeSplitGenerator(
+ store.newKeyComparator(),
+ store.options().splitTargetSize(),
+ store.options().splitOpenFileCost());
+ }
+
@Override
protected void withNonPartitionFilter(Predicate predicate) {
scan.withKeyFilter(predicate);
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 7109a91a..8421c811 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.table.sink.AbstractTableWrite;
@@ -40,6 +41,8 @@ import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.KeyValueTableRead;
+import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
+import org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import
org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
@@ -111,6 +114,14 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
public TableScan newScan() {
KeyValueFileStoreScan scan = store.newScan();
return new TableScan(scan, schema, store.pathFactory()) {
+ @Override
+ protected SplitGenerator splitGenerator(FileStorePathFactory
pathFactory) {
+ return new MergeTreeSplitGenerator(
+ store.newKeyComparator(),
+ store.options().splitTargetSize(),
+ store.options().splitOpenFileCost());
+ }
+
@Override
protected void withNonPartitionFilter(Predicate predicate) {
// currently we can only perform filter push down on keys
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AppendOnlySplitGenerator.java
similarity index 56%
copy from
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
copy to
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AppendOnlySplitGenerator.java
index fb0ce382..cca5cca2 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AppendOnlySplitGenerator.java
@@ -18,17 +18,26 @@
package org.apache.flink.table.store.table.source;
-import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.connector.file.table.BinPacking;
import org.apache.flink.table.store.file.data.DataFileMeta;
import java.util.List;
-import java.util.Map;
+import java.util.function.Function;
-/**
- * Generate {@link Split}s from a map with partition and bucket as keys and
{@link DataFileMeta}s as
- * values.
- */
-public interface SplitGenerator {
+/** Append only implementation of {@link SplitGenerator}. */
+public class AppendOnlySplitGenerator implements SplitGenerator {
+
+ private final long targetSplitSize;
+ private final long openFileCost;
+
+ public AppendOnlySplitGenerator(long targetSplitSize, long openFileCost) {
+ this.targetSplitSize = targetSplitSize;
+ this.openFileCost = openFileCost;
+ }
- List<Split> generate(Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>
groupedDataFileMetas);
+ @Override
+ public List<List<DataFileMeta>> split(List<DataFileMeta> files) {
+ Function<DataFileMeta, Long> weightFunc = file ->
Math.max(file.fileSize(), openFileCost);
+ return BinPacking.pack(files, weightFunc, targetSplitSize);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DefaultSplitGenerator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DefaultSplitGenerator.java
deleted file mode 100644
index 01107bb8..00000000
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DefaultSplitGenerator.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.table.source;
-
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/** Default implementation of {@link SplitGenerator}. The whole bucket is
marked as one split. */
-public class DefaultSplitGenerator implements SplitGenerator {
-
- private final FileStorePathFactory pathFactory;
-
- public DefaultSplitGenerator(FileStorePathFactory pathFactory) {
- this.pathFactory = pathFactory;
- }
-
- @Override
- public List<Split> generate(
- Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>
groupedDataFileMetas) {
- List<Split> splits = new ArrayList<>();
- for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>>
entryWithPartition :
- groupedDataFileMetas.entrySet()) {
- BinaryRowData partition = entryWithPartition.getKey();
- for (Map.Entry<Integer, List<DataFileMeta>> entryWithBucket :
- entryWithPartition.getValue().entrySet()) {
- int bucket = entryWithBucket.getKey();
- splits.add(
- new Split(
- partition,
- bucket,
- entryWithBucket.getValue(),
- pathFactory
- .createDataFilePathFactory(partition,
bucket)
- .bucketPath()));
- }
- }
- return splits;
- }
-}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/MergeTreeSplitGenerator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/MergeTreeSplitGenerator.java
new file mode 100644
index 00000000..9a70266a
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/MergeTreeSplitGenerator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.connector.file.table.BinPacking;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Merge tree implementation of {@link SplitGenerator}. */
+public class MergeTreeSplitGenerator implements SplitGenerator {
+
+ private final Comparator<RowData> keyComparator;
+
+ private final long targetSplitSize;
+
+ private final long openFileCost;
+
+ public MergeTreeSplitGenerator(
+ Comparator<RowData> keyComparator, long targetSplitSize, long
openFileCost) {
+ this.keyComparator = keyComparator;
+ this.targetSplitSize = targetSplitSize;
+ this.openFileCost = openFileCost;
+ }
+
+ @Override
+ public List<List<DataFileMeta>> split(List<DataFileMeta> files) {
+ /*
+ * The generator aims to parallel the scan execution by slicing the
files of each bucket
+ * into multiple splits. The generation has one constraint: files with
intersected key
+ * ranges (within one section) must go to the same split. Therefore,
the files are first to go
+ * through the interval partition algorithm to generate sections and
then through the
+ * BinPack algorithm. Note that the item to be packed here is each
section, the bin capacity
+ * is denoted as the targetSplitSize, and the final number of the bins
is the number of
+ * splits generated.
+ *
+ * For instance, there are files: [1, 2] [3, 4] [5, 180] [5, 190]
[200, 600] [210, 700]
+ * with targetSplitSize 128M. After interval partition, there are four
sections:
+ * - section1: [1, 2]
+ * - section2: [3, 4]
+ * - section3: [5, 180], [5, 190]
+ * - section4: [200, 600], [210, 700]
+ *
+ * After BinPack, section1 and section2 will be put into one bin
(split), so the final result will be:
+ * - split1: [1, 2] [3, 4]
+ * - split2: [5, 180] [5,190]
+ * - split3: [200, 600] [210, 700]
+ */
+ List<List<DataFileMeta>> sections =
+ new IntervalPartition(files, keyComparator)
+
.partition().stream().map(this::flatRun).collect(Collectors.toList());
+
+ return binPackSplits(sections);
+ }
+
+ private List<List<DataFileMeta>> binPackSplits(List<List<DataFileMeta>>
sections) {
+ Function<List<DataFileMeta>, Long> weightFunc =
+ file -> Math.max(totalSize(file), openFileCost);
+ return BinPacking.pack(sections, weightFunc, targetSplitSize).stream()
+ .map(this::flatFiles)
+ .collect(Collectors.toList());
+ }
+
+ private long totalSize(List<DataFileMeta> section) {
+ long size = 0L;
+ for (DataFileMeta file : section) {
+ size += file.fileSize();
+ }
+ return size;
+ }
+
+ private List<DataFileMeta> flatRun(List<SortedRun> section) {
+ List<DataFileMeta> files = new ArrayList<>();
+ section.forEach(run -> files.addAll(run.files()));
+ return files;
+ }
+
+ private List<DataFileMeta> flatFiles(List<List<DataFileMeta>> section) {
+ List<DataFileMeta> files = new ArrayList<>();
+ section.forEach(files::addAll);
+ return files;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
index fb0ce382..4680ad3a 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SplitGenerator.java
@@ -18,17 +18,12 @@
package org.apache.flink.table.store.table.source;
-import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.data.DataFileMeta;
import java.util.List;
-import java.util.Map;
-/**
- * Generate {@link Split}s from a map with partition and bucket as keys and
{@link DataFileMeta}s as
- * values.
- */
+/** Generate splits from {@link DataFileMeta}s. */
public interface SplitGenerator {
- List<Split> generate(Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>
groupedDataFileMetas);
+ List<List<DataFileMeta>> split(List<DataFileMeta> files);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index 1769e6e0..4a0e52f6 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -19,6 +19,9 @@
package org.apache.flink.table.store.table.source;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.predicate.CompoundPredicate;
import org.apache.flink.table.store.file.predicate.LeafPredicate;
@@ -31,6 +34,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
/** An abstraction layer above {@link FileStoreScan} to provide input split
generation. */
@@ -90,11 +94,37 @@ public abstract class TableScan {
public Plan plan() {
FileStoreScan.Plan plan = scan.plan();
- return new Plan(
- plan.snapshotId(),
- new
DefaultSplitGenerator(pathFactory).generate(plan.groupByPartFiles()));
+ return new Plan(plan.snapshotId(),
generateSplits(plan.groupByPartFiles()));
}
+ private List<Split> generateSplits(
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>
groupedDataFiles) {
+ return generateSplits(pathFactory, splitGenerator(pathFactory),
groupedDataFiles);
+ }
+
+ @VisibleForTesting
+ public static List<Split> generateSplits(
+ FileStorePathFactory pathFactory,
+ SplitGenerator splitGenerator,
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>
groupedDataFiles) {
+ List<Split> splits = new ArrayList<>();
+ for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entry :
+ groupedDataFiles.entrySet()) {
+ BinaryRowData partition = entry.getKey();
+ Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
+ for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
buckets.entrySet()) {
+ int bucket = bucketEntry.getKey();
+ Path bucketPath = pathFactory.bucketPath(partition, bucket);
+ splitGenerator.split(bucketEntry.getValue()).stream()
+ .map(files -> new Split(partition, bucket, files,
bucketPath))
+ .forEach(splits::add);
+ }
+ }
+ return splits;
+ }
+
+ protected abstract SplitGenerator splitGenerator(FileStorePathFactory
pathFactory);
+
protected abstract void withNonPartitionFilter(Predicate predicate);
protected Optional<Predicate> mapFilterFields(Predicate predicate, int[]
fieldIdxMapping) {
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
new file mode 100644
index 00000000..d515a891
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+
+/** Utils for {@link DataFileMeta}. */
+public class DataFileTestUtils {
+
+ public static DataFileMeta fromMinMax(String name, int minKey, int maxKey)
{
+ return newFile(name, 0, minKey, maxKey, 0);
+ }
+
+ public static DataFileMeta newFile(int level, int minKey, int maxKey, long
maxSequence) {
+ return newFile("", level, minKey, maxKey, maxSequence);
+ }
+
+ public static DataFileMeta newFile(
+ String name, int level, int minKey, int maxKey, long maxSequence) {
+ return new DataFileMeta(
+ name,
+ maxKey - minKey + 1,
+ 1,
+ row(minKey),
+ row(maxKey),
+ null,
+ null,
+ 0,
+ maxSequence,
+ 0,
+ level);
+ }
+
+ public static BinaryRowData row(int i) {
+ BinaryRowData row = new BinaryRowData(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeInt(0, i);
+ writer.complete();
+ return row;
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
index 51a56305..de636f63 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.store.file.mergetree.compact;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFileTestUtils;
import org.apache.flink.table.store.file.mergetree.Levels;
import org.apache.flink.table.store.file.mergetree.SortedRun;
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
+import static org.apache.flink.table.store.file.data.DataFileTestUtils.newFile;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link CompactManager}. */
@@ -203,27 +204,8 @@ public class CompactManagerTest {
assertThat(outputs).isEqualTo(expected);
}
- private static DataFileMeta newFile(int level, int minKey, int maxKey,
long maxSequence) {
- return new DataFileMeta(
- "",
- maxKey - minKey + 1,
- 1,
- row(minKey),
- row(maxKey),
- null,
- null,
- 0,
- maxSequence,
- 0,
- level);
- }
-
public static BinaryRowData row(int i) {
- BinaryRowData row = new BinaryRowData(1);
- BinaryRowWriter writer = new BinaryRowWriter(row);
- writer.writeInt(0, i);
- writer.complete();
- return row;
+ return DataFileTestUtils.row(i);
}
private CompactStrategy testStrategy() {
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitGeneratorTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitGeneratorTest.java
new file mode 100644
index 00000000..9a8feb0b
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitGeneratorTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.store.file.data.DataFileTestUtils.fromMinMax;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link AppendOnlySplitGenerator} and {@link
MergeTreeSplitGenerator}. */
+public class SplitGeneratorTest {
+
+ private final List<DataFileMeta> files =
+ Arrays.asList(
+ fromMinMax("1", 0, 10),
+ fromMinMax("2", 0, 12),
+ fromMinMax("3", 15, 60),
+ fromMinMax("4", 18, 40),
+ fromMinMax("5", 82, 85),
+ fromMinMax("6", 100, 200));
+
+ @Test
+ public void testAppend() {
+ assertThat(toNames(new AppendOnlySplitGenerator(40, 2).split(files)))
+ .containsExactlyInAnyOrder(
+ Arrays.asList("1", "2", "5"),
+ Collections.singletonList("3"),
+ Collections.singletonList("4"),
+ Collections.singletonList("6"));
+
+ assertThat(toNames(new AppendOnlySplitGenerator(50, 2).split(files)))
+ .containsExactlyInAnyOrder(
+ Arrays.asList("1", "2", "4"),
+ Arrays.asList("3", "5"),
+ Collections.singletonList("6"));
+
+ assertThat(toNames(new AppendOnlySplitGenerator(50, 10).split(files)))
+ .containsExactlyInAnyOrder(
+ Arrays.asList("1", "2", "4"),
+ Collections.singletonList("3"),
+ Collections.singletonList("5"),
+ Collections.singletonList("6"));
+ }
+
+ @Test
+ public void testMergeTree() {
+ Comparator<RowData> comparator = Comparator.comparingInt(o ->
o.getInt(0));
+ assertThat(toNames(new MergeTreeSplitGenerator(comparator, 40,
2).split(files)))
+ .containsExactlyInAnyOrder(
+ Arrays.asList("1", "2", "5"),
+ Arrays.asList("4", "3"),
+ Collections.singletonList("6"));
+
+ assertThat(toNames(new MergeTreeSplitGenerator(comparator, 40,
20).split(files)))
+ .containsExactlyInAnyOrder(
+ Arrays.asList("1", "2"),
+ Arrays.asList("4", "3"),
+ Collections.singletonList("5"),
+ Collections.singletonList("6"));
+ }
+
+ private List<List<String>> toNames(List<List<DataFileMeta>> splits) {
+ return splits.stream()
+ .map(
+ files ->
+ files.stream()
+ .map(DataFileMeta::fileName)
+ .collect(Collectors.toList()))
+ .collect(Collectors.toList());
+ }
+}