This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit af25ba1223d427f79ca47b94de5f8f9e70391abc Author: Jingsong Lee <[email protected]> AuthorDate: Tue Sep 23 18:55:21 2025 +0800 [core] Cross partition can work with fixed bucket and postpone bucket (#6307) --- .../content/primary-key-table/data-distribution.md | 44 +++++++++++----------- .../java/org/apache/paimon/table/BucketMode.java | 19 +++++----- .../java/org/apache/paimon/KeyValueFileStore.java | 4 +- .../org/apache/paimon/schema/SchemaValidation.java | 8 ---- .../paimon/table/AbstractFileStoreTable.java | 2 +- .../org/apache/paimon/schema/TableSchemaTest.java | 7 ---- .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 2 +- .../apache/paimon/flink/sink/FlinkSinkBuilder.java | 2 +- .../org/apache/paimon/flink/CatalogITCaseBase.java | 12 +++++- ...eITCase.java => CrossPartitionTableITCase.java} | 35 ++++++++++++++++- .../paimon/flink/FlinkJobRecoveryITCase.java | 2 +- .../paimon/spark/commands/PaimonSparkWriter.scala | 4 +- 12 files changed, 83 insertions(+), 58 deletions(-) diff --git a/docs/content/primary-key-table/data-distribution.md b/docs/content/primary-key-table/data-distribution.md index 1ae880786b..4a54afa7aa 100644 --- a/docs/content/primary-key-table/data-distribution.md +++ b/docs/content/primary-key-table/data-distribution.md @@ -55,8 +55,6 @@ Dynamic Bucket only support single write job. Please do not start multiple jobs (this can lead to duplicate data). Even if you enable `'write-only'` and start a dedicated compaction job, it won't work. {{< /hint >}} -### Normal Dynamic Bucket Mode - When your updates do not cross partitions (no partitions, or primary keys contain all partition fields), Dynamic Bucket mode uses HASH index to maintain mapping from key to bucket, it requires more memory than fixed bucket mode. @@ -66,27 +64,6 @@ Performance: entries in a partition takes up **1 GB** more memory, partitions that are no longer active do not take up memory. 2. For tables with low update rates, this mode is recommended to significantly improve performance. -`Normal Dynamic Bucket Mode` supports sort-compact to speed up queries. See [Sort Compact]({{< ref "maintenance/dedicated-compaction#sort-compact" >}}). - -### Cross Partitions Upsert Dynamic Bucket Mode - -When you need cross partition upsert (primary keys not contain all partition fields), Dynamic Bucket mode directly -maintains the mapping of keys to partition and bucket, uses local disks, and initializes indexes by reading all -existing keys in the table when starting stream write job. Different merge engines have different behaviors: - -1. Deduplicate: Delete data from the old partition and insert new data into the new partition. -2. PartialUpdate & Aggregation: Insert new data into the old partition. -3. FirstRow: Ignore new data if there is old value. - -Performance: For tables with a large amount of data, there will be a significant loss in performance. Moreover, -initialization takes a long time. - -If your upsert does not rely on too old data, you can consider configuring index TTL to reduce Index and initialization time: -- `'cross-partition-upsert.index-ttl'`: The TTL in rocksdb index and initialization, this can avoid maintaining too many - indexes and lead to worse and worse performance. - -But please note that this may also cause data duplication. - ## Postpone Bucket Postpone bucket mode is configured by `'bucket' = '-2'`. @@ -107,6 +84,27 @@ Finally, when you feel that the bucket number of some partition is too small, you can also run a rescale job. See `rescale` [procedure]({{< ref "flink/procedures" >}}). +## Cross Partitions Upsert + +When you need cross partition upsert (primary keys not contain all partition fields), recommend using the '-1' bucket. +Key Dynamic mode directly maintains the mapping of keys to partition and bucket, uses local disks, and initializes +indexes by reading all existing keys in the table when starting stream write job. Different merge engines have different behaviors: + +1. Deduplicate: Delete data from the old partition and insert new data into the new partition. +2. PartialUpdate & Aggregation: Insert new data into the old partition. +3. FirstRow: Ignore new data if there is old value. + +Performance: For tables with a large amount of data, there will be a significant loss in performance. Moreover, +initialization takes a long time. + +If your upsert does not rely on too old data, you can consider configuring index TTL to reduce Index and initialization time: +- `'cross-partition-upsert.index-ttl'`: The TTL in rocksdb index and initialization, this can avoid maintaining too many + indexes and lead to worse and worse performance. + +You can also use Cross Partitions Upsert with bucket (N > 0) or bucket (-2), in these modes, there is no global index to +ensure that your data undergoes reasonable deduplication, so relying on your input to have a complete changelog can +ensure the uniqueness of the data. + ## Pick Partition Fields The following three types of fields may be defined as partition fields in the warehouse: diff --git a/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java b/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java index 277c4740c1..4740ba8a00 100644 --- a/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java +++ b/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java @@ -38,20 +38,21 @@ public enum BucketMode { HASH_FIXED, /** - * The dynamic bucket mode records which bucket the key corresponds to through the index files. - * The index records the correspondence between the hash value of the primary-key and the - * bucket. This mode cannot support multiple concurrent writes or bucket skipping for reading - * filter conditions. This mode only works for changelog table. + * Hash-Dynamic mode records the correspondence between the hash of the primary key and the + * bucket number. It is used to simplify the distribution of primary keys to buckets, but cannot + * support large amounts of data. It cannot support multiple concurrent writes or bucket + * skipping for reading filter. This mode only works for primary key table. */ HASH_DYNAMIC, /** - * The cross partition mode is for cross partition upsert (primary keys not contain all - * partition fields). It directly maintains the mapping of primary keys to partition and bucket, - * uses local disks, and initializes indexes by reading all existing keys in the table when - * starting stream write job. + * Key-Dynamic mode records the correspondence between the primary key and the partition + + * bucket number. It is used to cross partition upsert (primary keys not contain all partition + * fields). It directly maintains the mapping of primary keys to partition and bucket using + * local disks, and initializes indexes by reading all existing keys in the table when starting + * write job. */ - CROSS_PARTITION, + KEY_DYNAMIC, /** * Ignoring bucket concept, although all data is written to bucket-0, the parallelism of reads diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 5e5b354e65..077068df73 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -54,7 +54,6 @@ import java.util.function.Supplier; import static org.apache.paimon.predicate.PredicateBuilder.and; import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping; import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; -import static org.apache.paimon.utils.Preconditions.checkArgument; /** {@link FileStore} for querying and updating {@link KeyValue}s. */ public class KeyValueFileStore extends AbstractFileStore<KeyValue> { @@ -104,9 +103,8 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> { case -2: return BucketMode.POSTPONE_MODE; case -1: - return crossPartitionUpdate ? BucketMode.CROSS_PARTITION : BucketMode.HASH_DYNAMIC; + return crossPartitionUpdate ? BucketMode.KEY_DYNAMIC : BucketMode.HASH_DYNAMIC; default: - checkArgument(!crossPartitionUpdate); return BucketMode.HASH_FIXED; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index cbf3153dac..9d73081805 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -567,14 +567,6 @@ public class SchemaValidation { } else if (bucket < 1 && !isPostponeBucketTable(schema, bucket)) { throw new RuntimeException("The number of buckets needs to be greater than 0."); } else { - if (schema.crossPartitionUpdate()) { - throw new IllegalArgumentException( - String.format( - "You should use dynamic bucket (bucket = -1) mode in cross partition update case " - + "(Primary key constraint %s not include all partition fields %s).", - schema.primaryKeys(), schema.partitionKeys())); - } - if (schema.primaryKeys().isEmpty() && schema.bucketKeys().isEmpty()) { throw new RuntimeException( "You should define a 'bucket-key' for bucketed append mode."); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index db0ca5e5ff..c1d34a99ef 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -238,7 +238,7 @@ abstract class AbstractFileStoreTable implements FileStoreTable { case HASH_FIXED: return new FixedBucketRowKeyExtractor(schema()); case HASH_DYNAMIC: - case CROSS_PARTITION: + case KEY_DYNAMIC: return new DynamicBucketRowKeyExtractor(schema()); case BUCKET_UNAWARE: return new AppendTableRowKeyExtractor(schema()); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java index 7f0cd60425..275241455a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java @@ -77,13 +77,6 @@ public class TableSchemaTest { new TableSchema(1, fields, 10, partitionKeys, primaryKeys, options, ""); assertThat(schema.crossPartitionUpdate()).isTrue(); - options.put(BUCKET.key(), "1"); - assertThatThrownBy(() -> validateTableSchema(schema)) - .hasMessageContaining("You should use dynamic bucket"); - - options.put(BUCKET.key(), "-1"); - validateTableSchema(schema); - options.put(SEQUENCE_FIELD.key(), "f2"); assertThatThrownBy(() -> validateTableSchema(schema)) .hasMessageContaining("You can not use sequence.field"); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index d5c33da412..f5cd33f019 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -270,7 +270,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> { case BUCKET_UNAWARE: buildForUnawareBucket(table, converted); break; - case CROSS_PARTITION: + case KEY_DYNAMIC: default: throw new UnsupportedOperationException( "Unsupported bucket mode: " + bucketMode); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 847cf06990..4f40a5c5bc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -224,7 +224,7 @@ public class FlinkSinkBuilder { return buildForFixedBucket(input); case HASH_DYNAMIC: return buildDynamicBucketSink(input, false); - case CROSS_PARTITION: + case KEY_DYNAMIC: return buildDynamicBucketSink(input, true); case BUCKET_UNAWARE: return buildUnawareBucketSink(input); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java index ddf8b1e3fc..b2bf1d7c90 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java @@ -31,6 +31,7 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -69,7 +70,11 @@ public abstract class CatalogITCaseBase extends AbstractTestBase { @BeforeEach public void before() throws IOException { - tEnv = tableEnvironmentBuilder().batchMode().build(); + TableEnvironmentBuilder tBuilder = tableEnvironmentBuilder().batchMode(); + if (sqlSyncMode() != null) { + tBuilder.setConf(TableConfigOptions.TABLE_DML_SYNC, sqlSyncMode()); + } + tEnv = tBuilder.build(); String catalog = "PAIMON"; path = getTempDirPath(); String inferScan = @@ -97,6 +102,11 @@ public abstract class CatalogITCaseBase extends AbstractTestBase { prepareEnv(); } + @Nullable + protected Boolean sqlSyncMode() { + return null; + } + protected Map<String, String> catalogOptions() { return Collections.emptyMap(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CrossPartitionTableITCase.java similarity index 84% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CrossPartitionTableITCase.java index 210489592e..98747a31d6 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CrossPartitionTableITCase.java @@ -22,13 +22,16 @@ import org.apache.flink.types.Row; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutionException; import static org.assertj.core.api.Assertions.assertThat; /** ITCase for batch file store. */ -public class GlobalDynamicBucketTableITCase extends CatalogITCaseBase { +public class CrossPartitionTableITCase extends CatalogITCaseBase { @Override protected List<String> ddl() { @@ -66,6 +69,12 @@ public class GlobalDynamicBucketTableITCase extends CatalogITCaseBase { + ")"); } + @Nullable + @Override + protected Boolean sqlSyncMode() { + return true; + } + @Test public void testBulkLoad() { sql("INSERT INTO T VALUES (1, 1, 1), (2, 1, 2), (1, 3, 3), (2, 4, 4), (3, 3, 5)"); @@ -172,4 +181,28 @@ public class GlobalDynamicBucketTableITCase extends CatalogITCaseBase { sql("insert into partial_part values (1, 2, 1, 2)"); assertThat(sql("select * from partial_part")).containsExactlyInAnyOrder(Row.of(1, 2, 1, 2)); } + + @Test + public void testCrossPartitionWithFixedBucket() { + sql( + "create table cross_fixed (pt int, k int, v int, primary key (k) not enforced) " + + "partitioned by (pt) with ('bucket' = '2')"); + sql("insert into cross_fixed values (1, 1, 1)"); + sql("insert into cross_fixed values (2, 2, 2)"); + assertThat(sql("select * from cross_fixed")) + .containsExactlyInAnyOrder(Row.of(1, 1, 1), Row.of(2, 2, 2)); + } + + @Test + public void testCrossPartitionWithPostponeBucket() + throws ExecutionException, InterruptedException { + sql( + "create table cross_postpone (pt int, k int, v int, primary key (k) not enforced) " + + "partitioned by (pt) with ('bucket' = '-2')"); + sql("insert into cross_postpone values (1, 1, 1)"); + sql("insert into cross_postpone values (2, 2, 2)"); + tEnv.executeSql("CALL sys.compact(`table` => 'default.cross_postpone')").await(); + assertThat(sql("select * from cross_postpone")) + .containsExactlyInAnyOrder(Row.of(1, 1, 1), Row.of(2, 2, 2)); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java index bad5998e39..fdd12ae2a7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java @@ -254,7 +254,7 @@ public class FlinkJobRecoveryITCase extends CatalogITCaseBase { "CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 STRING, pt STRING, PRIMARY KEY(k, pt) NOT ENFORCED) WITH ('bucket'='-1', 'commit.force-create-snapshot'='true')", tableName)); return; - case CROSS_PARTITION: + case KEY_DYNAMIC: batchSql( String.format( "CREATE TABLE IF NOT EXISTS `%s` (k INT, f1 STRING, pt STRING, PRIMARY KEY(k) NOT ENFORCED) WITH ('bucket'='-1', 'commit.force-create-snapshot'='true')", diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 70f4676cad..2f6b743f5c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -88,7 +88,7 @@ case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean = val withInitBucketCol = bucketMode match { case BUCKET_UNAWARE => data - case CROSS_PARTITION if !data.schema.fieldNames.contains(ROW_KIND_COL) => + case KEY_DYNAMIC if !data.schema.fieldNames.contains(ROW_KIND_COL) => data .withColumn(ROW_KIND_COL, lit(RowKind.INSERT.toByteValue)) .withColumn(BUCKET_COL, lit(-1)) @@ -165,7 +165,7 @@ case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean = } val written: Dataset[Array[Byte]] = bucketMode match { - case CROSS_PARTITION => + case KEY_DYNAMIC => // Topology: input -> bootstrap -> shuffle by key hash -> bucket-assigner -> shuffle by partition & bucket val rowType = SparkTypeUtils.toPaimonType(withInitBucketCol.schema).asInstanceOf[RowType] val assignerParallelism = Option(coreOptions.dynamicBucketAssignerParallelism)
