This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 411195c7bc [core] Add validation for 'pk-clustering-override' (#7528)
411195c7bc is described below

commit 411195c7bc185b8943d394ac0ba4f60ae730dfe7
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 25 22:29:10 2026 +0800

    [core] Add validation for 'pk-clustering-override' (#7528)
---
 .../primary-key-table/pk-clustering-override.md    | 13 ++-----
 .../ClusteringCompactManagerFactory.java           | 28 ++-------------
 .../org/apache/paimon/schema/SchemaValidation.java | 40 ++++++++++++++++++++++
 3 files changed, 44 insertions(+), 37 deletions(-)

diff --git a/docs/content/primary-key-table/pk-clustering-override.md 
b/docs/content/primary-key-table/pk-clustering-override.md
index 0c37456a9e..534c3c3785 100644
--- a/docs/content/primary-key-table/pk-clustering-override.md
+++ b/docs/content/primary-key-table/pk-clustering-override.md
@@ -53,7 +53,7 @@ CREATE TABLE my_table (
 After this, data files within each bucket will be physically sorted by `city` 
instead of `id`. Queries like
 `SELECT * FROM my_table WHERE city = 'Beijing'` can skip irrelevant data files 
by checking their min/max statistics
 on the clustering column.
-
+s
 ## How It Works
 
 PK Clustering Override replaces the default LSM compaction with a two-phase 
clustering compaction:
@@ -82,16 +82,6 @@ temporary files to reduce memory consumption, preventing OOM 
during multi-way me
 | `clustering.columns` | Must be set (one or more non-primary-key columns) |
 | `deletion-vectors.enabled` | Must be `true` |
 | `merge-engine` | `deduplicate` (default) or `first-row` only |
-| `sequence.fields` | Must **not** be set |
-| `record-level.expire-time` | Must **not** be set |
-
-## Related Options
-
-| Option | Default | Description |
-|--------|---------|-------------|
-| `clustering.columns` | (none) | Comma-separated column names used as the 
physical sort order for data files. |
-| `sort-spill-threshold` | (auto) | When the number of merge readers exceeds 
this value, smaller files are spilled to row-based temp files to reduce memory 
usage. |
-| `sort-spill-buffer-size` | `64 mb` | Buffer size used for external sort 
during Phase 1 rewrite. |
 
 ## When to Use
 
@@ -106,3 +96,4 @@ It is **not** suitable when:
 - Point lookups by primary key are the dominant access pattern (default LSM 
sort is already optimal).
 - You need `partial-update` or `aggregation` merge engine.
 - `sequence.fields` or `record-level.expire-time` is required.
+- Changelog producer`lookup` or `full-compaction` is required.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManagerFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManagerFactory.java
index b58f5941a0..f0b117df8b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManagerFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManagerFactory.java
@@ -38,6 +38,7 @@ import javax.annotation.Nullable;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import static 
org.apache.paimon.schema.SchemaValidation.validatePkClusteringOverride;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Factory to create {@link ClusteringCompactManager}. */
@@ -66,32 +67,7 @@ public class ClusteringCompactManagerFactory implements 
KvCompactionManagerFacto
         this.keyType = keyType;
         this.valueType = valueType;
         this.cacheManager = cacheManager;
-
-        if (options.clusteringColumns().isEmpty()) {
-            throw new IllegalArgumentException(
-                    "Cannot support 'pk-clustering-override' mode without 
'clustering.columns'.");
-        }
-        if (!options.deletionVectorsEnabled()) {
-            throw new UnsupportedOperationException(
-                    "Cannot support deletion-vectors disabled in 
'pk-clustering-override' mode.");
-        }
-        if (options.recordLevelExpireTime() != null) {
-            throw new UnsupportedOperationException(
-                    "Cannot support record level expire time enabled in 
'pk-clustering-override' mode.");
-        }
-        if (options.mergeEngine() != CoreOptions.MergeEngine.DEDUPLICATE
-                && options.mergeEngine() != CoreOptions.MergeEngine.FIRST_ROW) 
{
-            throw new UnsupportedOperationException(
-                    "Cannot support merge engine: "
-                            + options.mergeEngine()
-                            + " in 'pk-clustering-override' mode.");
-        }
-        if (!options.sequenceField().isEmpty()) {
-            throw new UnsupportedOperationException(
-                    "Cannot support sequence field: "
-                            + options.sequenceField()
-                            + " in 'pk-clustering-override' mode.");
-        }
+        validatePkClusteringOverride(options);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 689f65276b..130f9b29eb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -282,6 +282,8 @@ public class SchemaValidation {
         validateChainTable(schema, options);
 
         validateChangelogReadSequenceNumber(schema, options);
+
+        validatePkClusteringOverride(options);
     }
 
     public static void validateFallbackBranch(SchemaManager schemaManager, 
TableSchema schema) {
@@ -795,4 +797,42 @@ public class SchemaValidation {
                     CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key());
         }
     }
+
+    public static void validatePkClusteringOverride(CoreOptions options) {
+        if (options.pkClusteringOverride()) {
+            if (options.clusteringColumns().isEmpty()) {
+                throw new IllegalArgumentException(
+                        "Cannot support 'pk-clustering-override' mode without 
'clustering.columns'.");
+            }
+            if (!options.deletionVectorsEnabled()) {
+                throw new UnsupportedOperationException(
+                        "Cannot support deletion-vectors disabled in 
'pk-clustering-override' mode.");
+            }
+            if (options.recordLevelExpireTime() != null) {
+                throw new UnsupportedOperationException(
+                        "Cannot support record level expire time enabled in 
'pk-clustering-override' mode.");
+            }
+            if (options.mergeEngine() != CoreOptions.MergeEngine.DEDUPLICATE
+                    && options.mergeEngine() != 
CoreOptions.MergeEngine.FIRST_ROW) {
+                throw new UnsupportedOperationException(
+                        "Cannot support merge engine: "
+                                + options.mergeEngine()
+                                + " in 'pk-clustering-override' mode.");
+            }
+            if (!options.sequenceField().isEmpty()) {
+                throw new UnsupportedOperationException(
+                        "Cannot support sequence field: "
+                                + options.sequenceField()
+                                + " in 'pk-clustering-override' mode.");
+            }
+            ChangelogProducer changelogProducer = options.changelogProducer();
+            if (changelogProducer != ChangelogProducer.NONE
+                    && changelogProducer != ChangelogProducer.INPUT) {
+                throw new UnsupportedOperationException(
+                        "Cannot support changelog producer: "
+                                + changelogProducer
+                                + " in 'pk-clustering-override' mode.");
+            }
+        }
+    }
 }

Reply via email to