This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit c4bf2368741aab4686ef4ec6b5a28e157d55aca0 Author: LsomeYeah <[email protected]> AuthorDate: Mon Sep 29 13:06:35 2025 +0800 [core] Support incremental clustering for append unaware table (#6338) --- .../content/append-table/incremental-clustering.md | 134 ++++++++ .../shortcodes/generated/core_configuration.html | 6 + .../main/java/org/apache/paimon/CoreOptions.java | 11 +- .../append/cluster/IncrementalClusterManager.java | 242 +++++++++++++++ .../append/cluster/IncrementalClusterStrategy.java | 89 ++++++ .../org/apache/paimon/schema/SchemaValidation.java | 15 + .../cluster/IncrementalClusterManagerTest.java | 206 +++++++++++++ .../cluster/IncrementalClusterStrategyTest.java | 222 ++++++++++++++ .../apache/paimon/flink/action/CompactAction.java | 4 + .../apache/paimon/flink/sink/AppendTableSink.java | 6 +- .../paimon/spark/procedure/CompactProcedure.java | 131 +++++++- .../paimon/spark/commands/PaimonSparkWriter.scala | 2 +- .../spark/procedure/CompactProcedureTestBase.scala | 337 ++++++++++++++++++++- 13 files changed, 1396 insertions(+), 9 deletions(-) diff --git a/docs/content/append-table/incremental-clustering.md b/docs/content/append-table/incremental-clustering.md new file mode 100644 index 0000000000..72f24ec17e --- /dev/null +++ b/docs/content/append-table/incremental-clustering.md @@ -0,0 +1,134 @@ +--- +title: "Incremental Clustering" +weight: 4 +type: docs +aliases: +- /append-table/incremental-clustering.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Incremental Clustering + +Paimon currently supports ordering append tables using SFC (Space-Filling Curve)(see [sort compact]({{< ref "maintenance/dedicated-compaction#sort-compact" >}}) for more info). +The resulting data layout typically delivers better performance for queries that target clustering keys. +However, with the current SortCompaction, even when neither the data nor the clustering keys have changed, +each run still rewrites the entire dataset, which is extremely costly. + +To address this, Paimon introduced a more flexible, incremental clustering mechanism—Incremental Clustering. +On each run, it selects only a specific subset of files to cluster, avoiding a full rewrite. This enables low-cost, +sort-based optimization of the data layout and improves query performance. In addition, with Incremental Clustering, +you can adjust clustering keys without rewriting existing data, the layout evolves dynamically as cluster runs and +gradually converges to an optimal state, significantly reducing the decision-making complexity around data layout. + + +Incremental Clustering supports: +- Support incremental clustering; minimizing write amplification as possible. +- Support small-file compaction; during rewrites, respect target-file-size. +- Support changing clustering keys; newly ingested data is clustered according to the latest clustering keys. +- Provide a full mode; when selected, the entire dataset will be reclustered. + +**Only append unaware-bucket table supports Incremental Clustering.** + +## Enable Incremental Clustering + +To enable Incremental Clustering, the following configuration needs to be set for the table: +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Option</th> + <th class="text-left" style="width: 10%">Value</th> + <th class="text-left" style="width: 5%">Required</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>clustering.incremental</h5></td> + <td>true</td> + <td style="word-wrap: break-word;">Yes</td> + <td>Boolean</td> + <td>Must be set to true to enable incremental clustering. Default is false.</td> + </tr> + <tr> + <td><h5>clustering.columns</h5></td> + <td>'clustering-columns'</td> + <td style="word-wrap: break-word;">Yes</td> + <td>String</td> + <td>The clustering columns, in the format 'columnName1,columnName2'. It is not recommended to use partition keys as clustering keys.</td> + </tr> + <tr> + <td><h5>clustering.strategy</h5></td> + <td>'zorder' or 'hilbert' or 'order'</td> + <td style="word-wrap: break-word;">No</td> + <td>Boolean</td> + <td>The ordering algorithm used for clustering. If not set, It'll decided from the number of clustering columns. 'order' is used for 1 column, 'zorder' for less than 5 columns, and 'hilbert' for 5 or more columns.</td> + </tr> + </tbody> + +</table> + +Once Incremental Clustering for a table is enabled, you can run Incremental Clustering in batch mode periodically +to continuously optimizes data layout of the table and deliver better query performance. + +**Note**: Since common compaction also rewrites files, it may disrupt the ordered data layout built by Incremental Clustering. +Therefore, when Incremental Clustering is enabled, the table no longer supports write-time compaction or dedicated compaction; +clustering and small-file merging must be performed exclusively via Incremental Clustering runs. + +## Run Incremental Clustering +{{< hint info >}} + +Currently, only support running Incremental Clustering in spark, support for flink will be added in the near future. + +{{< /hint >}} + +To run a Incremental Clustering job, follow these instructions. + +{{< tabs "incremental-clustering" >}} + +{{< tab "Spark SQL" >}} + +Run the following sql: + +```sql +--set the write parallelism, if too big, may generate a large number of small files. +SET spark.sql.shuffle.partitions=10; + +-- run incremental clustering +CALL sys.compact(table => 'T') + +-- run incremental clustering with full mode, this will recluster all data +CALL sys.compact(table => 'T', compact_strategy => 'full') +``` +You don’t need to specify any clustering-related parameters when running Incremental Clustering, +these are already defined as table options. If you need to change clustering settings, please update the corresponding table options. +{{< /tab >}} + +{{< /tabs >}} + +## Implement +To balance write amplification and sorting effectiveness, Paimon leverages the LSM Tree notion of levels to stratify data files +and uses the Universal Compaction strategy to select files for clustering. +- Newly written data lands in level-0; files in level-0 are unclustered. +- All files in level-i are produced by sorting within the same sorting set. +- By analogy with Universal Compaction: in level-0, each file is a sorted run; in level-i, all files together constitute a single sorted run. During clustering, the sorted run is the basic unit of work. + +By introducing more levels, we can control the amount of data processed in each clustering run. +Data at higher levels is more stably clustered and less likely to be rewritten, thereby mitigating write amplification while maintaining good sorting effectiveness. diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 0a6f7f2eb9..3411e2bd92 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -146,6 +146,12 @@ under the License. <td>String</td> <td>Specifies the column name(s) used for comparison during range partitioning, in the format 'columnName1,columnName2'. If not set or set to an empty string, it indicates that the range partitioning feature is not enabled. This option will be effective only for append table without primary keys and batch execution mode.</td> </tr> + <tr> + <td><h5>clustering.incremental</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether enable incremental clustering.</td> + </tr> <tr> <td><h5>clustering.strategy</h5></td> <td style="word-wrap: break-word;">"auto"</td> diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 355a5fe357..d5209e54db 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1933,7 +1933,12 @@ public class CoreOptions implements Serializable { + "in 'clustering.by-columns'. 'order' is used for 1 column, 'zorder' for less than 5 columns, " + "and 'hilbert' for 5 or more columns."); - @Immutable + public static final ConfigOption<Boolean> CLUSTERING_INCREMENTAL = + key("clustering.incremental") + .booleanType() + .defaultValue(false) + .withDescription("Whether enable incremental clustering."); + public static final ConfigOption<Boolean> ROW_TRACKING_ENABLED = key("row-tracking.enabled") .booleanType() @@ -2997,6 +3002,10 @@ public class CoreOptions implements Serializable { return clusteringColumns(options.get(CLUSTERING_COLUMNS)); } + public boolean clusteringIncrementalEnabled() { + return options.get(CLUSTERING_INCREMENTAL); + } + public OrderType clusteringStrategy(int columnSize) { return clusteringStrategy(options.get(CLUSTERING_STRATEGY), columnSize); } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java new file mode 100644 index 0000000000..887150577c --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.cluster; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.compact.CompactUnit; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.mergetree.LevelSortedRun; +import org.apache.paimon.mergetree.SortedRun; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.SplitGenerator; +import org.apache.paimon.table.source.snapshot.SnapshotReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.paimon.CoreOptions.CLUSTERING_INCREMENTAL; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Manager for Incremental Clustering. */ +public class IncrementalClusterManager { + + private static final Logger LOG = LoggerFactory.getLogger(IncrementalClusterManager.class); + + private final SnapshotReader snapshotReader; + + private final IncrementalClusterStrategy incrementalClusterStrategy; + private final CoreOptions.OrderType clusterCurve; + private final List<String> clusterKeys; + + private int maxLevel; + + public IncrementalClusterManager(FileStoreTable table) { + checkArgument( + table.bucketMode() == BucketMode.BUCKET_UNAWARE, + "only append unaware-bucket table support incremental clustering."); + // drop stats to reduce memory usage + this.snapshotReader = table.newSnapshotReader().dropStats(); + CoreOptions options = table.coreOptions(); + checkArgument( + options.clusteringIncrementalEnabled(), + "Only support incremental clustering when '%s' is true.", + CLUSTERING_INCREMENTAL.key()); + this.incrementalClusterStrategy = + new IncrementalClusterStrategy( + table.schemaManager(), + options.clusteringColumns(), + options.maxSizeAmplificationPercent(), + options.sortedRunSizeRatio(), + options.numSortedRunCompactionTrigger()); + this.clusterCurve = options.clusteringStrategy(options.clusteringColumns().size()); + this.clusterKeys = options.clusteringColumns(); + this.maxLevel = options.numLevels(); + } + + public Map<BinaryRow, CompactUnit> prepareForCluster(boolean fullCompaction) { + // 1. construct LSM structure for each partition + Map<BinaryRow, List<LevelSortedRun>> partitionLevels = constructLevels(); + if (LOG.isDebugEnabled()) { + partitionLevels.forEach( + (partition, levelSortedRuns) -> { + String runsInfo = + levelSortedRuns.stream() + .map( + lsr -> + String.format( + "level-%s:%s", + lsr.level(), + lsr.run().files().size())) + .collect(Collectors.joining(",")); + LOG.debug( + "Partition {} has {} runs: [{}]", + partition, + levelSortedRuns.size(), + runsInfo); + }); + } + + // 2. pick files to be clustered for each partition + Map<BinaryRow, Optional<CompactUnit>> units = + partitionLevels.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + incrementalClusterStrategy.pick( + maxLevel, + entry.getValue(), + fullCompaction))); + + // 3. filter out empty units + Map<BinaryRow, CompactUnit> filteredUnits = + units.entrySet().stream() + .filter(entry -> entry.getValue().isPresent()) + .collect( + Collectors.toMap( + Map.Entry::getKey, entry -> entry.getValue().get())); + if (LOG.isDebugEnabled()) { + filteredUnits.forEach( + (partition, compactUnit) -> { + String filesInfo = + compactUnit.files().stream() + .map( + file -> + String.format( + "%s,%s,%s", + file.fileName(), + file.level(), + file.fileSize())) + .collect(Collectors.joining(", ")); + LOG.debug( + "Partition {}, outputLevel:{}, clustered with {} files: [{}]", + partition, + compactUnit.outputLevel(), + compactUnit.files().size(), + filesInfo); + }); + } + return filteredUnits; + } + + public Map<BinaryRow, List<LevelSortedRun>> constructLevels() { + List<DataSplit> dataSplits = snapshotReader.read().dataSplits(); + + maxLevel = + Math.max( + maxLevel, + dataSplits.stream() + .flatMap(split -> split.dataFiles().stream()) + .mapToInt(DataFileMeta::level) + .max() + .orElse(-1) + + 1); + checkArgument(maxLevel > 1, "Number of levels must be at least 2."); + + Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>(); + for (DataSplit dataSplit : dataSplits) { + partitionFiles + .computeIfAbsent(dataSplit.partition(), k -> new ArrayList<>()) + .addAll(dataSplit.dataFiles()); + } + + return partitionFiles.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> constructPartitionLevels(entry.getValue()))); + } + + public List<LevelSortedRun> constructPartitionLevels(List<DataFileMeta> partitionFiles) { + List<LevelSortedRun> partitionLevels = new ArrayList<>(); + Map<Integer, List<DataFileMeta>> levelMap = + partitionFiles.stream().collect(Collectors.groupingBy(DataFileMeta::level)); + + for (Map.Entry<Integer, List<DataFileMeta>> entry : levelMap.entrySet()) { + int level = entry.getKey(); + if (level == 0) { + for (DataFileMeta level0File : entry.getValue()) { + partitionLevels.add( + new LevelSortedRun(level, SortedRun.fromSingle(level0File))); + } + } else { + // don't need to guarantee that the files within the same sorted run are + // non-overlapping here, so we call SortedRun.fromSorted() to avoid sorting and + // validation + partitionLevels.add( + new LevelSortedRun(level, SortedRun.fromSorted(entry.getValue()))); + } + } + + // sort by level + partitionLevels.sort(Comparator.comparing(LevelSortedRun::level)); + return partitionLevels; + } + + public List<DataSplit> toSplits(BinaryRow partition, List<DataFileMeta> files) { + List<DataSplit> splits = new ArrayList<>(); + + DataSplit.Builder builder = + DataSplit.builder() + .withPartition(partition) + .withBucket(0) + .withTotalBuckets(1) + .isStreaming(false); + + SplitGenerator splitGenerator = snapshotReader.splitGenerator(); + List<SplitGenerator.SplitGroup> splitGroups = splitGenerator.splitForBatch(files); + + for (SplitGenerator.SplitGroup splitGroup : splitGroups) { + List<DataFileMeta> dataFiles = splitGroup.files; + String bucketPath = snapshotReader.pathFactory().bucketPath(partition, 0).toString(); + builder.withDataFiles(dataFiles) + .rawConvertible(splitGroup.rawConvertible) + .withBucketPath(bucketPath); + + splits.add(builder.build()); + } + + return splits; + } + + public List<DataFileMeta> upgrade(List<DataFileMeta> filesAfterCluster, int outputLevel) { + return filesAfterCluster.stream() + .map(file -> file.upgrade(outputLevel)) + .collect(Collectors.toList()); + } + + public CoreOptions.OrderType clusterCurve() { + return clusterCurve; + } + + public List<String> clusterKeys() { + return clusterKeys; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterStrategy.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterStrategy.java new file mode 100644 index 0000000000..1a90c16ce3 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterStrategy.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.cluster; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.compact.CompactUnit; +import org.apache.paimon.mergetree.LevelSortedRun; +import org.apache.paimon.mergetree.compact.UniversalCompaction; +import org.apache.paimon.schema.SchemaManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; + +/** Cluster strategy to decide which files to select for cluster. */ +public class IncrementalClusterStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(IncrementalClusterStrategy.class); + + private final List<String> clusterKeys; + private final SchemaManager schemaManager; + + private final UniversalCompaction universalCompaction; + + public IncrementalClusterStrategy( + SchemaManager schemaManager, + List<String> clusterKeys, + int maxSizeAmp, + int sizeRatio, + int numRunCompactionTrigger) { + this.universalCompaction = + new UniversalCompaction(maxSizeAmp, sizeRatio, numRunCompactionTrigger, null, null); + this.clusterKeys = clusterKeys; + this.schemaManager = schemaManager; + } + + public Optional<CompactUnit> pick( + int numLevels, List<LevelSortedRun> runs, boolean fullCompaction) { + if (fullCompaction) { + return pickFullCompaction(numLevels, runs); + } + return universalCompaction.pick(numLevels, runs); + } + + public Optional<CompactUnit> pickFullCompaction(int numLevels, List<LevelSortedRun> runs) { + int maxLevel = numLevels - 1; + if (runs.isEmpty()) { + // no sorted run, no need to compact + if (LOG.isDebugEnabled()) { + LOG.debug("no sorted run, no need to compact"); + } + return Optional.empty(); + } + + if (runs.size() == 1 && runs.get(0).level() == maxLevel) { + long schemaId = runs.get(0).run().files().get(0).schemaId(); + CoreOptions coreOptions = CoreOptions.fromMap(schemaManager.schema(schemaId).options()); + // only one sorted run in the maxLevel with the same cluster key + if (coreOptions.clusteringColumns().equals(clusterKeys)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "only one sorted run in the maxLevel with the same cluster key, no need to compact"); + } + return Optional.empty(); + } + } + + // full compaction + return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 9d73081805..09de891a37 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -239,6 +239,8 @@ public class SchemaValidation { validateMergeFunctionFactory(schema); validateRowTracking(schema, options); + + validateIncrementalClustering(schema, options); } public static void validateFallbackBranch(SchemaManager schemaManager, TableSchema schema) { @@ -648,4 +650,17 @@ public class SchemaValidation { "Data evolution config must disabled with deletion-vectors.enabled"); } } + + private static void validateIncrementalClustering(TableSchema schema, CoreOptions options) { + if (options.clusteringIncrementalEnabled()) { + checkArgument( + options.bucket() == -1, + "Cannot define %s for incremental clustering table, it only support bucket = -1", + CoreOptions.BUCKET.key()); + checkArgument( + schema.primaryKeys().isEmpty(), + "Cannot define %s for incremental clustering table.", + PRIMARY_KEY.key()); + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java new file mode 100644 index 0000000000..8b37aff0b8 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.cluster; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.mergetree.LevelSortedRun; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link IncrementalClusterManager}. */ +public class IncrementalClusterManagerTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testNonUnAwareBucketTable() throws Exception { + Map<String, String> options = new HashMap<>(); + options.put(CoreOptions.BUCKET.key(), "1"); + options.put(CoreOptions.BUCKET_KEY.key(), "f0"); + + assertThatThrownBy(() -> createTable(options, Collections.emptyList())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot define bucket for incremental clustering table, it only support bucket = -1"); + } + + @Test + public void testNonClusterIncremental() throws Exception { + Map<String, String> options = new HashMap<>(); + options.put(CoreOptions.BUCKET.key(), "-1"); + options.put(CoreOptions.CLUSTERING_INCREMENTAL.key(), "false"); + FileStoreTable table = createTable(options, Collections.emptyList()); + assertThatThrownBy(() -> new IncrementalClusterManager(table)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Only support incremental clustering when 'clustering.incremental' is true."); + } + + @Test + public void testConstructPartitionLevels() throws Exception { + // Create a valid table for IncrementalClusterManager + Map<String, String> options = new HashMap<>(); + FileStoreTable table = createTable(options, Collections.emptyList()); + IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table); + + // Create test files with different levels + List<DataFileMeta> partitionFiles = new ArrayList<>(); + + // Level 0 files (should be individual LevelSortedRuns) + DataFileMeta level0File1 = createFile(100, 1, 0); + DataFileMeta level0File2 = createFile(200, 1, 0); + partitionFiles.add(level0File1); + partitionFiles.add(level0File2); + + // Level 1 files (should be grouped into one LevelSortedRun) + DataFileMeta level1File1 = createFile(300, 1, 1); + DataFileMeta level1File2 = createFile(400, 1, 1); + partitionFiles.add(level1File1); + partitionFiles.add(level1File2); + + // Level 2 files (should be grouped into one LevelSortedRun) + DataFileMeta level2File1 = createFile(500, 1, 2); + partitionFiles.add(level2File1); + + // Call the method under test + List<LevelSortedRun> result = + incrementalClusterManager.constructPartitionLevels(partitionFiles); + + // Verify the results + assertThat(result).hasSize(4); // 2 level-0 runs + 1 level-1 run + 1 level-2 run + + // Verify sorting by level + assertThat(result.get(0).level()).isEqualTo(0); + assertThat(result.get(1).level()).isEqualTo(0); + assertThat(result.get(2).level()).isEqualTo(1); + assertThat(result.get(3).level()).isEqualTo(2); + + // Verify level 0 files are individual runs + assertThat(result.get(0).run().files()).hasSize(1); + assertThat(result.get(1).run().files()).hasSize(1); + + // Verify level 1 files are grouped together + assertThat(result.get(2).run().files()).hasSize(2); + assertThat(result.get(2).run().files()).containsExactlyInAnyOrder(level1File1, level1File2); + + // Verify level 2 file + assertThat(result.get(3).run().files()).hasSize(1); + assertThat(result.get(3).run().files()).containsExactly(level2File1); + } + + @Test + public void testUpgrade() throws Exception { + // Create a valid table for IncrementalClusterManager + Map<String, String> options = new HashMap<>(); + FileStoreTable table = createTable(options, Collections.emptyList()); + IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table); + + // Create test files with different levels + List<DataFileMeta> filesAfterCluster = new ArrayList<>(); + DataFileMeta file1 = createFile(100, 1, 0); + DataFileMeta file2 = createFile(200, 1, 1); + DataFileMeta file3 = createFile(300, 1, 2); + filesAfterCluster.add(file1); + filesAfterCluster.add(file2); + filesAfterCluster.add(file3); + + // Test upgrading to level 3 + int outputLevel = 3; + List<DataFileMeta> upgradedFiles = + incrementalClusterManager.upgrade(filesAfterCluster, outputLevel); + + // Verify the results + assertThat(upgradedFiles).hasSize(3); + + // Verify all files are upgraded to the specified output level + for (DataFileMeta upgradedFile : upgradedFiles) { + assertThat(upgradedFile.level()).isEqualTo(outputLevel); + } + } + + private FileStoreTable createTable( + Map<String, String> customOptions, List<String> partitionKeys) throws Exception { + Map<String, String> options = new HashMap<>(); + options.put(CoreOptions.BUCKET.key(), "-1"); + options.put(CoreOptions.CLUSTERING_COLUMNS.key(), "f0,f1"); + options.put(CoreOptions.CLUSTERING_INCREMENTAL.key(), "true"); + options.putAll(customOptions); + + Schema schema = + new Schema( + RowType.of( + DataTypes.INT(), + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.STRING()) + .getFields(), + partitionKeys, + Collections.emptyList(), + options, + ""); + + SchemaManager schemaManager = + new SchemaManager(LocalFileIO.create(), new Path(tempDir.toString())); + return FileStoreTableFactory.create( + LocalFileIO.create(), + new Path(tempDir.toString()), + schemaManager.createTable(schema)); + } + + private static DataFileMeta createFile(long size, long schemaId, int level) { + return DataFileMeta.create( + "", + size, + 1, + null, + null, + null, + null, + 0, + 0, + schemaId, + level, + null, + null, + FileSource.APPEND, + null, + null, + null); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterStrategyTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterStrategyTest.java new file mode 100644 index 0000000000..1061c50c9e --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterStrategyTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.cluster; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.compact.CompactUnit; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.mergetree.LevelSortedRun; +import org.apache.paimon.mergetree.SortedRun; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarCharType; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static java.util.Collections.emptyList; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link IncrementalClusterStrategy}. */ +public class IncrementalClusterStrategyTest { + + @TempDir static java.nio.file.Path tempDir; + + private static SchemaManager schemaManager; + private static IncrementalClusterStrategy incrementalClusterStrategy; + + @BeforeAll + public static void setUp() throws Exception { + schemaManager = new SchemaManager(LocalFileIO.create(), new Path(tempDir.toString())); + prepareSchema(); + incrementalClusterStrategy = + new IncrementalClusterStrategy(schemaManager, Arrays.asList("f0", "f1"), 25, 1, 3); + } + + @Test + public void testPickFullCompactionWithEmptyRuns() { + // Test case: empty runs should return empty + Optional<CompactUnit> result = + incrementalClusterStrategy.pickFullCompaction(3, Collections.emptyList()); + assertThat(result.isPresent()).isFalse(); + } + + @Test + public void testPickFullCompactionWithSingleRunSameClusterKey() { + // Test case: single run at max level with same cluster key should return empty + // Using schema-0 which has clustering columns "f0,f1" (same as clusterKeys) + int maxLevel = 2; + DataFileMeta file = createFile(1, 0L, maxLevel); + LevelSortedRun run = new LevelSortedRun(maxLevel, SortedRun.fromSingle(file)); + List<LevelSortedRun> runs = Collections.singletonList(run); + + Optional<CompactUnit> result = incrementalClusterStrategy.pickFullCompaction(3, runs); + assertThat(result.isPresent()).isFalse(); + } + + @Test + public void testPickFullCompactionWithSingleRunDifferentClusterKey() { + // Test case: single run at max level with different cluster key should return compaction + // Using schema-1 which has clustering columns "f2,f3" (different from clusterKeys "f0,f1") + int maxLevel = 2; + DataFileMeta file = createFile(1, 1L, maxLevel); // Use schema-1 + LevelSortedRun run = new LevelSortedRun(maxLevel, SortedRun.fromSingle(file)); + List<LevelSortedRun> runs = Collections.singletonList(run); + + Optional<CompactUnit> result = incrementalClusterStrategy.pickFullCompaction(3, runs); + assertThat(result.isPresent()).isTrue(); + assertThat(result.get().outputLevel()).isEqualTo(maxLevel); + assertThat(result.get().files()).hasSize(1); + assertThat(result.get().files().get(0).fileSize()).isEqualTo(1); + } + + @Test + public void testPickFullCompactionWithSingleRunNotAtMaxLevel() { + // Test case: single run not at max level should return compaction + int maxLevel = 2; + int runLevel = 1; + DataFileMeta file = createFile(1, 0L, runLevel); + LevelSortedRun run = new LevelSortedRun(runLevel, SortedRun.fromSingle(file)); + List<LevelSortedRun> runs = Collections.singletonList(run); + + Optional<CompactUnit> result = incrementalClusterStrategy.pickFullCompaction(3, runs); + assertThat(result.isPresent()).isTrue(); + assertThat(result.get().outputLevel()).isEqualTo(maxLevel); + assertThat(result.get().files()).hasSize(1); + assertThat(result.get().files().get(0).fileSize()).isEqualTo(1); + } + + @Test + public void testPickFullCompactionWithMultipleRuns() { + // Test case: multiple runs should return compaction + int maxLevel = 2; + DataFileMeta file1 = createFile(1, 0L, 0); + DataFileMeta file2 = createFile(2, 1L, 1); + DataFileMeta file3 = createFile(3, 0L, maxLevel); + + LevelSortedRun run1 = new LevelSortedRun(0, SortedRun.fromSingle(file1)); + LevelSortedRun run2 = new LevelSortedRun(1, SortedRun.fromSingle(file2)); + LevelSortedRun run3 = new LevelSortedRun(maxLevel, SortedRun.fromSingle(file3)); + + List<LevelSortedRun> runs = Arrays.asList(run1, run2, run3); + + Optional<CompactUnit> result = incrementalClusterStrategy.pickFullCompaction(3, runs); + assertThat(result.isPresent()).isTrue(); + assertThat(result.get().outputLevel()).isEqualTo(maxLevel); + assertThat(result.get().files()).hasSize(3); + + long[] fileSizes = + result.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray(); + assertThat(fileSizes).isEqualTo(new long[] {1, 2, 3}); + } + + @Test + public void testPickFullCompactionWithDifferentNumLevels() { + // Test case: different number of levels + DataFileMeta file1 = createFile(1, 0L, 0); + DataFileMeta file2 = createFile(2, 1L, 1); + + LevelSortedRun run1 = new LevelSortedRun(0, SortedRun.fromSingle(file1)); + LevelSortedRun run2 = new LevelSortedRun(1, SortedRun.fromSingle(file2)); + + List<LevelSortedRun> runs = Arrays.asList(run1, run2); + + // Test with numLevels = 5, maxLevel should be 4 + Optional<CompactUnit> result = incrementalClusterStrategy.pickFullCompaction(5, runs); + assertThat(result.isPresent()).isTrue(); + assertThat(result.get().outputLevel()).isEqualTo(4); // maxLevel = numLevels - 1 + assertThat(result.get().files()).hasSize(2); + } + + @Test + public void testPickFullCompactionWithMixedSchemas() { + // Test case: runs with mixed schemas (some same, some different cluster keys) + int maxLevel = 2; + DataFileMeta file1 = createFile(1, 0L, 0); // schema-0: f0,f1 (same as clusterKeys) + DataFileMeta file2 = createFile(2, 1L, 1); // schema-1: f2,f3 (different from clusterKeys) + DataFileMeta file3 = createFile(3, 0L, maxLevel); // schema-0: f0,f1 (same as clusterKeys) + + LevelSortedRun run1 = new LevelSortedRun(0, SortedRun.fromSingle(file1)); + LevelSortedRun run2 = new LevelSortedRun(1, SortedRun.fromSingle(file2)); + LevelSortedRun run3 = new LevelSortedRun(maxLevel, SortedRun.fromSingle(file3)); + + List<LevelSortedRun> runs = Arrays.asList(run1, run2, run3); + + Optional<CompactUnit> result = incrementalClusterStrategy.pickFullCompaction(3, runs); + assertThat(result.isPresent()).isTrue(); + assertThat(result.get().outputLevel()).isEqualTo(maxLevel); + assertThat(result.get().files()).hasSize(3); + } + + private static void prepareSchema() throws Exception { + // schema-0 + Map<String, String> options = new HashMap<>(); + options.put(CoreOptions.CLUSTERING_COLUMNS.key(), "f0,f1"); + Schema schema = + new Schema( + RowType.of( + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE) + .getFields(), + emptyList(), + emptyList(), + options, + ""); + schemaManager.createTable(schema); + // schema-1 + schemaManager.commitChanges( + SchemaChange.setOption(CoreOptions.CLUSTERING_COLUMNS.key(), "f2,f3")); + } + + private static DataFileMeta createFile(long size, long schemaId, int level) { + return DataFileMeta.create( + "", + size, + 1, + null, + null, + null, + null, + 0, + 0, + schemaId, + level, + null, + null, + FileSource.APPEND, + null, + null, + null); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 3b10138350..b9bfaac7f8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -100,6 +100,10 @@ public class CompactAction extends TableActionBase { checkArgument( !((FileStoreTable) table).coreOptions().dataEvolutionEnabled(), "Compact action does not support data evolution table yet. "); + checkArgument( + !(((FileStoreTable) table).bucketMode() == BucketMode.BUCKET_UNAWARE + && ((FileStoreTable) table).coreOptions().clusteringIncrementalEnabled()), + "The table has enabled incremental clustering, and do not support compact in flink yet."); HashMap<String, String> dynamicOptions = new HashMap<>(tableConf); dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false"); table = table.copy(dynamicOptions); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java index b95318df79..ccad6b35ca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java @@ -23,6 +23,7 @@ import org.apache.paimon.flink.compact.AppendPreCommitCompactCoordinatorOperator import org.apache.paimon.flink.compact.AppendPreCommitCompactWorkerOperator; import org.apache.paimon.flink.source.AppendBypassCoordinateOperatorFactory; import org.apache.paimon.options.Options; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.api.common.RuntimeExecutionMode; @@ -98,7 +99,10 @@ public abstract class AppendTableSink<T> extends FlinkWriteSink<T> { } boolean enableCompaction = - !table.coreOptions().writeOnly() && !table.coreOptions().dataEvolutionEnabled(); + !table.coreOptions().writeOnly() + && !table.coreOptions().dataEvolutionEnabled() + && !(table.bucketMode() == BucketMode.BUCKET_UNAWARE + && table.coreOptions().clusteringIncrementalEnabled()); boolean isStreamingMode = input.getExecutionEnvironment() .getConfiguration() diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 3a33fe008b..299fb6f6d7 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -23,8 +23,13 @@ import org.apache.paimon.CoreOptions.OrderType; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.append.AppendCompactCoordinator; import org.apache.paimon.append.AppendCompactTask; +import org.apache.paimon.append.cluster.IncrementalClusterManager; +import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.operation.BaseAppendFileStoreWrite; import org.apache.paimon.partition.PartitionPredicate; @@ -42,6 +47,7 @@ import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.sink.CommitMessageSerializer; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.source.DataSplit; @@ -92,6 +98,9 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import scala.collection.JavaConverters; +import scala.collection.Seq; + import static org.apache.paimon.CoreOptions.createCommitUser; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.spark.sql.types.DataTypes.StringType; @@ -146,8 +155,7 @@ public class CompactProcedure extends BaseProcedure { public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String partitions = blank(args, 1) ? null : args.getString(1); - // make full compact strategy as default. - String compactStrategy = blank(args, 2) ? FULL : args.getString(2); + String compactStrategy = blank(args, 2) ? null : args.getString(2); String sortType = blank(args, 3) ? OrderType.NONE.name() : args.getString(3); List<String> sortColumns = blank(args, 4) @@ -166,7 +174,9 @@ public class CompactProcedure extends BaseProcedure { "sort compact do not support 'partition_idle_time'."); } - if (!(compactStrategy.equalsIgnoreCase(FULL) || compactStrategy.equalsIgnoreCase(MINOR))) { + if (!(compactStrategy == null + || compactStrategy.equalsIgnoreCase(FULL) + || compactStrategy.equalsIgnoreCase(MINOR))) { throw new IllegalArgumentException( String.format( "The compact strategy only supports 'full' or 'minor', but '%s' is configured.", @@ -205,6 +215,12 @@ public class CompactProcedure extends BaseProcedure { dynamicOptions, CoreOptions.WRITE_ONLY.key(), "false"); ProcedureUtils.putAllOptions(dynamicOptions, options); table = table.copy(dynamicOptions); + if (((FileStoreTable) table).coreOptions().clusteringIncrementalEnabled() + && (!OrderType.NONE.name().equals(sortType))) { + throw new IllegalArgumentException( + "The table has enabled incremental clustering, do not support sort compact."); + } + InternalRow internalRow = newInternalRow( execute( @@ -238,6 +254,13 @@ public class CompactProcedure extends BaseProcedure { @Nullable Duration partitionIdleTime) { BucketMode bucketMode = table.bucketMode(); OrderType orderType = OrderType.of(sortType); + + boolean clusterIncrementalEnabled = table.coreOptions().clusteringIncrementalEnabled(); + if (compactStrategy == null) { + // make full compact strategy as default for compact. + // make non-full compact strategy as default for incremental clustering. + compactStrategy = clusterIncrementalEnabled ? MINOR : FULL; + } boolean fullCompact = compactStrategy.equalsIgnoreCase(FULL); RowType partitionType = table.schema().logicalPartitionType(); Predicate filter = @@ -251,6 +274,7 @@ public class CompactProcedure extends BaseProcedure { .getOrElse(null); PartitionPredicate partitionPredicate = PartitionPredicate.fromPredicate(partitionType, filter); + if (orderType.equals(OrderType.NONE)) { JavaSparkContext javaSparkContext = new JavaSparkContext(spark().sparkContext()); switch (bucketMode) { @@ -264,8 +288,12 @@ public class CompactProcedure extends BaseProcedure { javaSparkContext); break; case BUCKET_UNAWARE: - compactUnAwareBucketTable( - table, partitionPredicate, partitionIdleTime, javaSparkContext); + if (clusterIncrementalEnabled) { + clusterIncrementalUnAwareBucketTable(table, fullCompact, relation); + } else { + compactUnAwareBucketTable( + table, partitionPredicate, partitionIdleTime, javaSparkContext); + } break; default: throw new UnsupportedOperationException( @@ -521,6 +549,99 @@ public class CompactProcedure extends BaseProcedure { } } + private void clusterIncrementalUnAwareBucketTable( + FileStoreTable table, boolean fullCompaction, DataSourceV2Relation relation) { + IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table); + Map<BinaryRow, CompactUnit> compactUnits = + incrementalClusterManager.prepareForCluster(fullCompaction); + + // generate splits for each partition + Map<BinaryRow, DataSplit[]> partitionSplits = + compactUnits.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + incrementalClusterManager + .toSplits( + entry.getKey(), + entry.getValue().files()) + .toArray(new DataSplit[0]))); + + // sort in partition + TableSorter sorter = + TableSorter.getSorter( + table, + incrementalClusterManager.clusterCurve(), + incrementalClusterManager.clusterKeys()); + LOG.info( + "Start to sort in partition, cluster curve is {}, cluster keys is {}", + incrementalClusterManager.clusterCurve(), + incrementalClusterManager.clusterKeys()); + + Dataset<Row> datasetForWrite = + partitionSplits.values().stream() + .map( + split -> { + Dataset<Row> dataset = + PaimonUtils.createDataset( + spark(), + ScanPlanHelper$.MODULE$.createNewScanPlan( + split, relation)); + return sorter.sort(dataset); + }) + .reduce(Dataset::union) + .orElse(null); + if (datasetForWrite != null) { + // set to write only to prevent invoking compaction + // do not use overwrite, we don't need to overwrite the whole partition + PaimonSparkWriter writer = PaimonSparkWriter.apply(table).writeOnly(); + Seq<CommitMessage> commitMessages = writer.write(datasetForWrite); + if (LOG.isDebugEnabled()) { + LOG.debug("Commit messages after writing:{}", commitMessages); + } + + // re-organize the commit messages to generate the compact messages + Map<BinaryRow, List<DataFileMeta>> partitionClustered = new HashMap<>(); + for (CommitMessage commitMessage : JavaConverters.seqAsJavaList(commitMessages)) { + checkArgument(commitMessage.bucket() == 0); + partitionClustered + .computeIfAbsent(commitMessage.partition(), k -> new ArrayList<>()) + .addAll(((CommitMessageImpl) commitMessage).newFilesIncrement().newFiles()); + } + + List<CommitMessage> clusterMessages = new ArrayList<>(); + for (Map.Entry<BinaryRow, List<DataFileMeta>> entry : partitionClustered.entrySet()) { + BinaryRow partition = entry.getKey(); + List<DataFileMeta> clusterBefore = compactUnits.get(partition).files(); + // upgrade the clustered file to outputLevel + List<DataFileMeta> clusterAfter = + incrementalClusterManager.upgrade( + entry.getValue(), compactUnits.get(partition).outputLevel()); + LOG.info( + "Partition {}: upgrade file level to {}", + partition, + compactUnits.get(partition).outputLevel()); + CompactIncrement compactIncrement = + new CompactIncrement(clusterBefore, clusterAfter, Collections.emptyList()); + clusterMessages.add( + new CommitMessageImpl( + partition, + // bucket 0 is bucket for unaware-bucket table + // for compatibility with the old design + 0, + table.coreOptions().bucket(), + DataIncrement.emptyIncrement(), + compactIncrement)); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Commit messages after reorganizing:{}", clusterMessages); + } + + writer.commit(JavaConverters.asScalaBuffer(clusterMessages).toSeq()); + } + } + private Map<BinaryRow, DataSplit[]> packForSort(List<DataSplit> dataSplits) { // Make a single partition as a compact group return dataSplits.stream() diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 2f6b743f5c..246245c052 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -258,7 +258,7 @@ case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean = } } val clusteringColumns = coreOptions.clusteringColumns() - if (!clusteringColumns.isEmpty) { + if ((!coreOptions.clusteringIncrementalEnabled()) && (!clusteringColumns.isEmpty)) { val strategy = coreOptions.clusteringStrategy(tableSchema.fields().size()) val sorter = TableSorter.getSorter(table, strategy, clusteringColumns) input = sorter.sort(data) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala index 8d1b35cc12..96f85ba757 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala @@ -24,7 +24,7 @@ import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.source.DataSplit -import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerStageCompleted, SparkListenerStageSubmitted} +import org.apache.spark.scheduler.{SparkListener, SparkListenerStageSubmitted} import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest @@ -33,6 +33,7 @@ import org.assertj.core.api.Assertions import java.util import scala.collection.JavaConverters._ +import scala.util.Random /** Test compact procedure. See [[CompactProcedure]]. */ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamTest { @@ -795,6 +796,340 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT } } + test("Paimon Procedure: cluster for unpartitioned table") { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + spark.sql( + s""" + |CREATE TABLE T (a INT, b INT, c STRING) + |TBLPROPERTIES ('bucket'='-1','num-levels'='6', 'num-sorted-run.compaction-trigger'='2', 'clustering.columns'='a,b', 'clustering.strategy'='zorder', 'clustering.incremental' = 'true') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b", "c") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T") + + try { + val random = new Random() + val randomStr = random.nextString(40) + // first write + inputData.addData((0, 0, randomStr)) + inputData.addData((0, 1, randomStr)) + inputData.addData((0, 2, randomStr)) + inputData.addData((1, 0, randomStr)) + inputData.addData((1, 1, randomStr)) + inputData.addData((1, 2, randomStr)) + inputData.addData((2, 0, randomStr)) + inputData.addData((2, 1, randomStr)) + inputData.addData((2, 2, randomStr)) + stream.processAllAvailable() + + val result = new util.ArrayList[Row]() + for (a <- 0 until 3) { + for (b <- 0 until 3) { + result.add(Row(a, b, randomStr)) + } + } + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) + + // first cluster, the outputLevel should be 5 + checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), Row(true) :: Nil) + + // first cluster result + val result2 = new util.ArrayList[Row]() + result2.add(0, Row(0, 0, randomStr)) + result2.add(1, Row(0, 1, randomStr)) + result2.add(2, Row(1, 0, randomStr)) + result2.add(3, Row(1, 1, randomStr)) + result2.add(4, Row(0, 2, randomStr)) + result2.add(5, Row(1, 2, randomStr)) + result2.add(6, Row(2, 0, randomStr)) + result2.add(7, Row(2, 1, randomStr)) + result2.add(8, Row(2, 2, randomStr)) + + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2) + + var clusteredTable = loadTable("T") + checkSnapshot(clusteredTable) + var dataSplits = clusteredTable.newSnapshotReader().read().dataSplits() + Assertions.assertThat(dataSplits.size()).isEqualTo(1) + Assertions.assertThat(dataSplits.get(0).dataFiles().size()).isEqualTo(1) + Assertions.assertThat(dataSplits.get(0).dataFiles().get(0).level()).isEqualTo(5) + + // second write + inputData.addData((0, 3, null), (1, 3, null), (2, 3, null)) + inputData.addData((3, 0, null), (3, 1, null), (3, 2, null), (3, 3, null)) + stream.processAllAvailable() + + val result3 = new util.ArrayList[Row]() + result3.addAll(result2) + for (a <- 0 until 3) { + result3.add(Row(a, 3, null)) + } + for (b <- 0 until 4) { + result3.add(Row(3, b, null)) + } + + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3) + + // second cluster, the outputLevel should be 4 + checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), Row(true) :: Nil) + // second cluster result, level-5 and level-4 are individually ordered + val result4 = new util.ArrayList[Row]() + result4.addAll(result2) + result4.add(Row(0, 3, null)) + result4.add(Row(1, 3, null)) + result4.add(Row(3, 0, null)) + result4.add(Row(3, 1, null)) + result4.add(Row(2, 3, null)) + result4.add(Row(3, 2, null)) + result4.add(Row(3, 3, null)) + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result4) + + clusteredTable = loadTable("T") + checkSnapshot(clusteredTable) + dataSplits = clusteredTable.newSnapshotReader().read().dataSplits() + Assertions.assertThat(dataSplits.size()).isEqualTo(1) + Assertions.assertThat(dataSplits.get(0).dataFiles().size()).isEqualTo(2) + Assertions.assertThat(dataSplits.get(0).dataFiles().get(0).level()).isEqualTo(5) + Assertions.assertThat(dataSplits.get(0).dataFiles().get(1).level()).isEqualTo(4) + + // full cluster + checkAnswer( + spark.sql("CALL paimon.sys.compact(table => 'T', compact_strategy => 'full')"), + Row(true) :: Nil) + val result5 = new util.ArrayList[Row]() + result5.add(Row(0, 0, randomStr)) + result5.add(Row(0, 1, randomStr)) + result5.add(Row(1, 0, randomStr)) + result5.add(Row(1, 1, randomStr)) + result5.add(Row(0, 2, randomStr)) + result5.add(Row(0, 3, null)) + result5.add(Row(1, 2, randomStr)) + result5.add(Row(1, 3, null)) + result5.add(Row(2, 0, randomStr)) + result5.add(Row(2, 1, randomStr)) + result5.add(Row(3, 0, null)) + result5.add(Row(3, 1, null)) + result5.add(Row(2, 2, randomStr)) + result5.add(Row(2, 3, null)) + result5.add(Row(3, 2, null)) + result5.add(Row(3, 3, null)) + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result5) + + clusteredTable = loadTable("T") + checkSnapshot(clusteredTable) + dataSplits = clusteredTable.newSnapshotReader().read().dataSplits() + Assertions.assertThat(dataSplits.size()).isEqualTo(1) + Assertions.assertThat(dataSplits.get(0).dataFiles().size()).isEqualTo(1) + Assertions.assertThat(dataSplits.get(0).dataFiles().get(0).level()).isEqualTo(5) + + } finally { + stream.stop() + } + } + } + } + + test("Paimon Procedure: cluster for partitioned table") { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + spark.sql( + s""" + |CREATE TABLE T (a INT, b INT, c STRING, pt INT) + |PARTITIONED BY (pt) + |TBLPROPERTIES ('bucket'='-1', 'num-levels'='6', 'num-sorted-run.compaction-trigger'='2', 'clustering.columns'='a,b', 'clustering.strategy'='zorder', 'clustering.incremental' = 'true') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, Int, String, Int)] + val stream = inputData + .toDS() + .toDF("a", "b", "c", "pt") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T ORDER BY pt") + + try { + val random = new Random() + val randomStr = random.nextString(50) + // first write + for (pt <- 0 until 2) { + val c = if (pt == 0) randomStr else null + inputData.addData((0, 0, c, pt)) + inputData.addData((0, 1, c, pt)) + inputData.addData((0, 2, c, pt)) + inputData.addData((1, 0, c, pt)) + inputData.addData((1, 1, c, pt)) + inputData.addData((1, 2, c, pt)) + inputData.addData((2, 0, c, pt)) + inputData.addData((2, 1, c, pt)) + inputData.addData((2, 2, c, pt)) + } + stream.processAllAvailable() + + val result = new util.ArrayList[Row]() + for (pt <- 0 until 2) { + for (a <- 0 until 3) { + for (b <- 0 until 3) { + val c = if (pt == 0) randomStr else null + result.add(Row(a, b, c, pt)) + } + } + } + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) + + // first cluster, the outputLevel should be 5 + checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), Row(true) :: Nil) + + // first cluster result + val result2 = new util.ArrayList[Row]() + for (pt <- 0 until 2) { + val c = if (pt == 0) randomStr else null + result2.add(Row(0, 0, c, pt)) + result2.add(Row(0, 1, c, pt)) + result2.add(Row(1, 0, c, pt)) + result2.add(Row(1, 1, c, pt)) + result2.add(Row(0, 2, c, pt)) + result2.add(Row(1, 2, c, pt)) + result2.add(Row(2, 0, c, pt)) + result2.add(Row(2, 1, c, pt)) + result2.add(Row(2, 2, c, pt)) + } + + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2) + + var clusteredTable = loadTable("T") + checkSnapshot(clusteredTable) + var dataSplits = clusteredTable.newSnapshotReader().read().dataSplits() + Assertions.assertThat(dataSplits.size()).isEqualTo(2) + dataSplits.forEach( + dataSplit => { + Assertions.assertThat(dataSplit.dataFiles().size()).isEqualTo(1) + Assertions.assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(5) + }) + + // second write + for (pt <- 0 until 2) { + inputData.addData((0, 3, null, pt), (1, 3, null, pt), (2, 3, null, pt)) + inputData.addData( + (3, 0, null, pt), + (3, 1, null, pt), + (3, 2, null, pt), + (3, 3, null, pt)) + } + stream.processAllAvailable() + + val result3 = new util.ArrayList[Row]() + for (pt <- 0 until 2) { + val c = if (pt == 0) randomStr else null + result3.add(Row(0, 0, c, pt)) + result3.add(Row(0, 1, c, pt)) + result3.add(Row(1, 0, c, pt)) + result3.add(Row(1, 1, c, pt)) + result3.add(Row(0, 2, c, pt)) + result3.add(Row(1, 2, c, pt)) + result3.add(Row(2, 0, c, pt)) + result3.add(Row(2, 1, c, pt)) + result3.add(Row(2, 2, c, pt)) + for (a <- 0 until 3) { + result3.add(Row(a, 3, null, pt)) + } + for (b <- 0 until 4) { + result3.add(Row(3, b, null, pt)) + } + } + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3) + + // second cluster + checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), Row(true) :: Nil) + val result4 = new util.ArrayList[Row]() + // for partition-0: only file in level-0 will be picked for clustering, outputLevel is 4 + result4.add(Row(0, 0, randomStr, 0)) + result4.add(Row(0, 1, randomStr, 0)) + result4.add(Row(1, 0, randomStr, 0)) + result4.add(Row(1, 1, randomStr, 0)) + result4.add(Row(0, 2, randomStr, 0)) + result4.add(Row(1, 2, randomStr, 0)) + result4.add(Row(2, 0, randomStr, 0)) + result4.add(Row(2, 1, randomStr, 0)) + result4.add(Row(2, 2, randomStr, 0)) + result4.add(Row(0, 3, null, 0)) + result4.add(Row(1, 3, null, 0)) + result4.add(Row(3, 0, null, 0)) + result4.add(Row(3, 1, null, 0)) + result4.add(Row(2, 3, null, 0)) + result4.add(Row(3, 2, null, 0)) + result4.add(Row(3, 3, null, 0)) + // for partition-1:all files will be picked for clustering, outputLevel is 5 + result4.add(Row(0, 0, null, 1)) + result4.add(Row(0, 1, null, 1)) + result4.add(Row(1, 0, null, 1)) + result4.add(Row(1, 1, null, 1)) + result4.add(Row(0, 2, null, 1)) + result4.add(Row(0, 3, null, 1)) + result4.add(Row(1, 2, null, 1)) + result4.add(Row(1, 3, null, 1)) + result4.add(Row(2, 0, null, 1)) + result4.add(Row(2, 1, null, 1)) + result4.add(Row(3, 0, null, 1)) + result4.add(Row(3, 1, null, 1)) + result4.add(Row(2, 2, null, 1)) + result4.add(Row(2, 3, null, 1)) + result4.add(Row(3, 2, null, 1)) + result4.add(Row(3, 3, null, 1)) + + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result4) + + clusteredTable = loadTable("T") + checkSnapshot(clusteredTable) + dataSplits = clusteredTable.newSnapshotReader().read().dataSplits() + Assertions.assertThat(dataSplits.size()).isEqualTo(2) + dataSplits.forEach( + dataSplit => { + if (dataSplit.partition().getInt(0) == 1) { + // partition-1 + Assertions.assertThat(dataSplit.dataFiles().size()).isEqualTo(1) + Assertions.assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(5) + } else { + // partition-0 + Assertions.assertThat(dataSplit.dataFiles().size()).isEqualTo(2) + Assertions.assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(5) + Assertions.assertThat(dataSplit.dataFiles().get(1).level()).isEqualTo(4) + } + }) + } finally { + stream.stop() + } + } + } + } + + def checkSnapshot(table: FileStoreTable): Unit = { + Assertions + .assertThat(table.latestSnapshot().get().commitKind().toString) + .isEqualTo(CommitKind.COMPACT.toString) + } + def lastSnapshotCommand(table: FileStoreTable): CommitKind = { table.snapshotManager().latestSnapshot().commitKind() }
