This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit af25ba1223d427f79ca47b94de5f8f9e70391abc
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Sep 23 18:55:21 2025 +0800

    [core] Cross partition can work with fixed bucket and postpone bucket 
(#6307)
---
 .../content/primary-key-table/data-distribution.md | 44 +++++++++++-----------
 .../java/org/apache/paimon/table/BucketMode.java   | 19 +++++-----
 .../java/org/apache/paimon/KeyValueFileStore.java  |  4 +-
 .../org/apache/paimon/schema/SchemaValidation.java |  8 ----
 .../paimon/table/AbstractFileStoreTable.java       |  2 +-
 .../org/apache/paimon/schema/TableSchemaTest.java  |  7 ----
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  |  2 +-
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java |  2 +-
 .../org/apache/paimon/flink/CatalogITCaseBase.java | 12 +++++-
 ...eITCase.java => CrossPartitionTableITCase.java} | 35 ++++++++++++++++-
 .../paimon/flink/FlinkJobRecoveryITCase.java       |  2 +-
 .../paimon/spark/commands/PaimonSparkWriter.scala  |  4 +-
 12 files changed, 83 insertions(+), 58 deletions(-)

diff --git a/docs/content/primary-key-table/data-distribution.md 
b/docs/content/primary-key-table/data-distribution.md
index 1ae880786b..4a54afa7aa 100644
--- a/docs/content/primary-key-table/data-distribution.md
+++ b/docs/content/primary-key-table/data-distribution.md
@@ -55,8 +55,6 @@ Dynamic Bucket only support single write job. Please do not 
start multiple jobs
 (this can lead to duplicate data). Even if you enable `'write-only'` and start 
a dedicated compaction job, it won't work.
 {{< /hint >}}
 
-### Normal Dynamic Bucket Mode
-
 When your updates do not cross partitions (no partitions, or primary keys 
contain all partition fields), Dynamic
 Bucket mode uses HASH index to maintain mapping from key to bucket, it 
requires more memory than fixed bucket mode.
 
@@ -66,27 +64,6 @@ Performance:
    entries in a partition takes up **1 GB** more memory, partitions that are 
no longer active do not take up memory.
 2. For tables with low update rates, this mode is recommended to significantly 
improve performance.
 
-`Normal Dynamic Bucket Mode` supports sort-compact to speed up queries. See 
[Sort Compact]({{< ref "maintenance/dedicated-compaction#sort-compact" >}}).
-
-### Cross Partitions Upsert Dynamic Bucket Mode
-
-When you need cross partition upsert (primary keys not contain all partition 
fields), Dynamic Bucket mode directly
-maintains the mapping of keys to partition and bucket, uses local disks, and 
initializes indexes by reading all
-existing keys in the table when starting stream write job. Different merge 
engines have different behaviors:
-
-1. Deduplicate: Delete data from the old partition and insert new data into 
the new partition.
-2. PartialUpdate & Aggregation: Insert new data into the old partition.
-3. FirstRow: Ignore new data if there is old value.
-
-Performance: For tables with a large amount of data, there will be a 
significant loss in performance. Moreover,
-initialization takes a long time.
-
-If your upsert does not rely on too old data, you can consider configuring 
index TTL to reduce Index and initialization time:
-- `'cross-partition-upsert.index-ttl'`: The TTL in rocksdb index and 
initialization, this can avoid maintaining too many
-  indexes and lead to worse and worse performance.
-
-But please note that this may also cause data duplication.
-
 ## Postpone Bucket
 
 Postpone bucket mode is configured by `'bucket' = '-2'`.
@@ -107,6 +84,27 @@ Finally, when you feel that the bucket number of some 
partition is too small,
 you can also run a rescale job.
 See `rescale` [procedure]({{< ref "flink/procedures" >}}).
 
+## Cross Partitions Upsert
+
+When you need cross partition upsert (primary keys not contain all partition 
fields), recommend using the '-1' bucket.
+Key Dynamic mode directly maintains the mapping of keys to partition and 
bucket, uses local disks, and initializes 
+indexes by reading all existing keys in the table when starting stream write 
job. Different merge engines have different behaviors:
+
+1. Deduplicate: Delete data from the old partition and insert new data into 
the new partition.
+2. PartialUpdate & Aggregation: Insert new data into the old partition.
+3. FirstRow: Ignore new data if there is old value.
+
+Performance: For tables with a large amount of data, there will be a 
significant loss in performance. Moreover,
+initialization takes a long time.
+
+If your upsert does not rely on too old data, you can consider configuring 
index TTL to reduce Index and initialization time:
+- `'cross-partition-upsert.index-ttl'`: The TTL in rocksdb index and 
initialization, this can avoid maintaining too many
+  indexes and lead to worse and worse performance.
+
+You can also use Cross Partitions Upsert with bucket (N > 0) or bucket (-2), 
in these modes, there is no global index to
+ensure that your data undergoes reasonable deduplication, so relying on your 
input to have a complete changelog can
+ensure the uniqueness of the data.
+
 ## Pick Partition Fields
 
 The following three types of fields may be defined as partition fields in the 
warehouse:
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java 
b/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java
index 277c4740c1..4740ba8a00 100644
--- a/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java
+++ b/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java
@@ -38,20 +38,21 @@ public enum BucketMode {
     HASH_FIXED,
 
     /**
-     * The dynamic bucket mode records which bucket the key corresponds to 
through the index files.
-     * The index records the correspondence between the hash value of the 
primary-key and the
-     * bucket. This mode cannot support multiple concurrent writes or bucket 
skipping for reading
-     * filter conditions. This mode only works for changelog table.
+     * Hash-Dynamic mode records the correspondence between the hash of the 
primary key and the
+     * bucket number. It is used to simplify the distribution of primary keys 
to buckets, but cannot
+     * support large amounts of data. It cannot support multiple concurrent 
writes or bucket
+     * skipping for reading filter. This mode only works for primary key table.
      */
     HASH_DYNAMIC,
 
     /**
-     * The cross partition mode is for cross partition upsert (primary keys 
not contain all
-     * partition fields). It directly maintains the mapping of primary keys to 
partition and bucket,
-     * uses local disks, and initializes indexes by reading all existing keys 
in the table when
-     * starting stream write job.
+     * Key-Dynamic mode records the correspondence between the primary key and 
the partition +
+     * bucket number. It is used to cross partition upsert (primary keys not 
contain all partition
+     * fields). It directly maintains the mapping of primary keys to partition 
and bucket using
+     * local disks, and initializes indexes by reading all existing keys in 
the table when starting
+     * write job.
      */
-    CROSS_PARTITION,
+    KEY_DYNAMIC,
 
     /**
      * Ignoring bucket concept, although all data is written to bucket-0, the 
parallelism of reads
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 5e5b354e65..077068df73 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -54,7 +54,6 @@ import java.util.function.Supplier;
 import static org.apache.paimon.predicate.PredicateBuilder.and;
 import static 
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
 import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** {@link FileStore} for querying and updating {@link KeyValue}s. */
 public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
@@ -104,9 +103,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
             case -2:
                 return BucketMode.POSTPONE_MODE;
             case -1:
-                return crossPartitionUpdate ? BucketMode.CROSS_PARTITION : 
BucketMode.HASH_DYNAMIC;
+                return crossPartitionUpdate ? BucketMode.KEY_DYNAMIC : 
BucketMode.HASH_DYNAMIC;
             default:
-                checkArgument(!crossPartitionUpdate);
                 return BucketMode.HASH_FIXED;
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index cbf3153dac..9d73081805 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -567,14 +567,6 @@ public class SchemaValidation {
         } else if (bucket < 1 && !isPostponeBucketTable(schema, bucket)) {
             throw new RuntimeException("The number of buckets needs to be 
greater than 0.");
         } else {
-            if (schema.crossPartitionUpdate()) {
-                throw new IllegalArgumentException(
-                        String.format(
-                                "You should use dynamic bucket (bucket = -1) 
mode in cross partition update case "
-                                        + "(Primary key constraint %s not 
include all partition fields %s).",
-                                schema.primaryKeys(), schema.partitionKeys()));
-            }
-
             if (schema.primaryKeys().isEmpty() && 
schema.bucketKeys().isEmpty()) {
                 throw new RuntimeException(
                         "You should define a 'bucket-key' for bucketed append 
mode.");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index db0ca5e5ff..c1d34a99ef 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -238,7 +238,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
             case HASH_FIXED:
                 return new FixedBucketRowKeyExtractor(schema());
             case HASH_DYNAMIC:
-            case CROSS_PARTITION:
+            case KEY_DYNAMIC:
                 return new DynamicBucketRowKeyExtractor(schema());
             case BUCKET_UNAWARE:
                 return new AppendTableRowKeyExtractor(schema());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
index 7f0cd60425..275241455a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
@@ -77,13 +77,6 @@ public class TableSchemaTest {
                 new TableSchema(1, fields, 10, partitionKeys, primaryKeys, 
options, "");
         assertThat(schema.crossPartitionUpdate()).isTrue();
 
-        options.put(BUCKET.key(), "1");
-        assertThatThrownBy(() -> validateTableSchema(schema))
-                .hasMessageContaining("You should use dynamic bucket");
-
-        options.put(BUCKET.key(), "-1");
-        validateTableSchema(schema);
-
         options.put(SEQUENCE_FIELD.key(), "f2");
         assertThatThrownBy(() -> validateTableSchema(schema))
                 .hasMessageContaining("You can not use sequence.field");
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index d5c33da412..f5cd33f019 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -270,7 +270,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
                 case BUCKET_UNAWARE:
                     buildForUnawareBucket(table, converted);
                     break;
-                case CROSS_PARTITION:
+                case KEY_DYNAMIC:
                 default:
                     throw new UnsupportedOperationException(
                             "Unsupported bucket mode: " + bucketMode);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 847cf06990..4f40a5c5bc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -224,7 +224,7 @@ public class FlinkSinkBuilder {
                 return buildForFixedBucket(input);
             case HASH_DYNAMIC:
                 return buildDynamicBucketSink(input, false);
-            case CROSS_PARTITION:
+            case KEY_DYNAMIC:
                 return buildDynamicBucketSink(input, true);
             case BUCKET_UNAWARE:
                 return buildUnawareBucketSink(input);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
index ddf8b1e3fc..b2bf1d7c90 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
@@ -31,6 +31,7 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -69,7 +70,11 @@ public abstract class CatalogITCaseBase extends 
AbstractTestBase {
 
     @BeforeEach
     public void before() throws IOException {
-        tEnv = tableEnvironmentBuilder().batchMode().build();
+        TableEnvironmentBuilder tBuilder = 
tableEnvironmentBuilder().batchMode();
+        if (sqlSyncMode() != null) {
+            tBuilder.setConf(TableConfigOptions.TABLE_DML_SYNC, sqlSyncMode());
+        }
+        tEnv = tBuilder.build();
         String catalog = "PAIMON";
         path = getTempDirPath();
         String inferScan =
@@ -97,6 +102,11 @@ public abstract class CatalogITCaseBase extends 
AbstractTestBase {
         prepareEnv();
     }
 
+    @Nullable
+    protected Boolean sqlSyncMode() {
+        return null;
+    }
+
     protected Map<String, String> catalogOptions() {
         return Collections.emptyMap();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CrossPartitionTableITCase.java
similarity index 84%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CrossPartitionTableITCase.java
index 210489592e..98747a31d6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CrossPartitionTableITCase.java
@@ -22,13 +22,16 @@ import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** ITCase for batch file store. */
-public class GlobalDynamicBucketTableITCase extends CatalogITCaseBase {
+public class CrossPartitionTableITCase extends CatalogITCaseBase {
 
     @Override
     protected List<String> ddl() {
@@ -66,6 +69,12 @@ public class GlobalDynamicBucketTableITCase extends 
CatalogITCaseBase {
                         + ")");
     }
 
+    @Nullable
+    @Override
+    protected Boolean sqlSyncMode() {
+        return true;
+    }
+
     @Test
     public void testBulkLoad() {
         sql("INSERT INTO T VALUES (1, 1, 1), (2, 1, 2), (1, 3, 3), (2, 4, 4), 
(3, 3, 5)");
@@ -172,4 +181,28 @@ public class GlobalDynamicBucketTableITCase extends 
CatalogITCaseBase {
         sql("insert into partial_part values (1, 2, 1, 2)");
         assertThat(sql("select * from 
partial_part")).containsExactlyInAnyOrder(Row.of(1, 2, 1, 2));
     }
+
+    @Test
+    public void testCrossPartitionWithFixedBucket() {
+        sql(
+                "create table cross_fixed (pt int, k int, v int, primary key 
(k) not enforced) "
+                        + "partitioned by (pt) with ('bucket' = '2')");
+        sql("insert into cross_fixed values (1, 1, 1)");
+        sql("insert into cross_fixed values (2, 2, 2)");
+        assertThat(sql("select * from cross_fixed"))
+                .containsExactlyInAnyOrder(Row.of(1, 1, 1), Row.of(2, 2, 2));
+    }
+
+    @Test
+    public void testCrossPartitionWithPostponeBucket()
+            throws ExecutionException, InterruptedException {
+        sql(
+                "create table cross_postpone (pt int, k int, v int, primary 
key (k) not enforced) "
+                        + "partitioned by (pt) with ('bucket' = '-2')");
+        sql("insert into cross_postpone values (1, 1, 1)");
+        sql("insert into cross_postpone values (2, 2, 2)");
+        tEnv.executeSql("CALL sys.compact(`table` => 
'default.cross_postpone')").await();
+        assertThat(sql("select * from cross_postpone"))
+                .containsExactlyInAnyOrder(Row.of(1, 1, 1), Row.of(2, 2, 2));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
index bad5998e39..fdd12ae2a7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
@@ -254,7 +254,7 @@ public class FlinkJobRecoveryITCase extends 
CatalogITCaseBase {
                                 "CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 
STRING, pt STRING, PRIMARY KEY(k, pt) NOT ENFORCED) WITH ('bucket'='-1', 
'commit.force-create-snapshot'='true')",
                                 tableName));
                 return;
-            case CROSS_PARTITION:
+            case KEY_DYNAMIC:
                 batchSql(
                         String.format(
                                 "CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 
STRING, pt STRING, PRIMARY KEY(k) NOT ENFORCED) WITH ('bucket'='-1', 
'commit.force-create-snapshot'='true')",
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 70f4676cad..2f6b743f5c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -88,7 +88,7 @@ case class PaimonSparkWriter(table: FileStoreTable, 
writeRowTracking: Boolean =
 
     val withInitBucketCol = bucketMode match {
       case BUCKET_UNAWARE => data
-      case CROSS_PARTITION if !data.schema.fieldNames.contains(ROW_KIND_COL) =>
+      case KEY_DYNAMIC if !data.schema.fieldNames.contains(ROW_KIND_COL) =>
         data
           .withColumn(ROW_KIND_COL, lit(RowKind.INSERT.toByteValue))
           .withColumn(BUCKET_COL, lit(-1))
@@ -165,7 +165,7 @@ case class PaimonSparkWriter(table: FileStoreTable, 
writeRowTracking: Boolean =
     }
 
     val written: Dataset[Array[Byte]] = bucketMode match {
-      case CROSS_PARTITION =>
+      case KEY_DYNAMIC =>
         // Topology: input -> bootstrap -> shuffle by key hash -> 
bucket-assigner -> shuffle by partition & bucket
         val rowType = 
SparkTypeUtils.toPaimonType(withInitBucketCol.schema).asInstanceOf[RowType]
         val assignerParallelism = 
Option(coreOptions.dynamicBucketAssignerParallelism)

Reply via email to