This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 1bb2f6027f38d41dd9fe9997a8f99b1eaeb9e7e8 Author: Xiduo You <[email protected]> AuthorDate: Mon May 12 12:17:14 2025 +0800 [core] Fix dynamic insert into table with partition columns contain primary key error (#5588) --- .../paimon/crosspartition/GlobalIndexAssigner.java | 6 ++-- .../paimon/crosspartition/IndexBootstrap.java | 13 ++++---- .../KeyPartPartitionKeyExtractor.java | 5 ++- .../sink/RowPartitionAllPrimaryKeyExtractor.java} | 37 ++++++++++------------ .../paimon/crosspartition/IndexBootstrapTest.java | 2 +- .../spark/sql/InsertOverwriteTableTestBase.scala | 23 ++++++++++++++ 6 files changed, 52 insertions(+), 34 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java index 4700e73998..5ed13b4fce 100644 --- a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java +++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java @@ -40,7 +40,7 @@ import org.apache.paimon.sort.BinaryExternalSortBuffer; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.PartitionKeyExtractor; -import org.apache.paimon.table.sink.RowPartitionKeyExtractor; +import org.apache.paimon.table.sink.RowPartitionAllPrimaryKeyExtractor; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -129,7 +129,7 @@ public class GlobalIndexAssigner implements Serializable, Closeable { CoreOptions coreOptions = table.coreOptions(); this.targetBucketRowNumber = (int) coreOptions.dynamicBucketTargetRowNum(); - this.extractor = new RowPartitionKeyExtractor(table.schema()); + this.extractor = new RowPartitionAllPrimaryKeyExtractor(table.schema()); this.keyPartExtractor = new KeyPartPartitionKeyExtractor(table.schema()); // state @@ -149,7 +149,7 @@ public class GlobalIndexAssigner implements Serializable, Closeable { path.toString(), rocksdbOptions, coreOptions.crossPartitionUpsertIndexTtl()); - RowType keyType = table.schema().logicalTrimmedPrimaryKeysType(); + RowType keyType = table.schema().logicalPrimaryKeysType(); this.keyIndex = stateFactory.valueState( INDEX_NAME, diff --git a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java index ec3ab25ebd..b8f13594a5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java +++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java @@ -26,7 +26,7 @@ import org.apache.paimon.data.JoinedRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.Table; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DataTableScan; import org.apache.paimon.table.source.ReadBuilder; @@ -58,9 +58,9 @@ public class IndexBootstrap implements Serializable { public static final String BUCKET_FIELD = "_BUCKET"; - private final Table table; + private final FileStoreTable table; - public IndexBootstrap(Table table) { + public IndexBootstrap(FileStoreTable table) { this.table = table; } @@ -71,9 +71,11 @@ public class IndexBootstrap implements Serializable { public RecordReader<InternalRow> bootstrap(int numAssigners, int assignId) throws IOException { RowType rowType = table.rowType(); + List<String> fieldNames = rowType.getFieldNames(); + // Use `trimmedPrimaryKeys` to reduce data size since we will add partition at the end. int[] keyProjection = - table.primaryKeys().stream() + table.schema().trimmedPrimaryKeys().stream() .map(fieldNames::indexOf) .mapToInt(Integer::intValue) .toArray(); @@ -136,13 +138,12 @@ public class IndexBootstrap implements Serializable { } public static RowType bootstrapType(TableSchema schema) { - List<String> primaryKeys = schema.primaryKeys(); + List<String> primaryKeys = schema.trimmedPrimaryKeys(); List<String> partitionKeys = schema.partitionKeys(); List<DataField> bootstrapFields = new ArrayList<>( schema.projectedLogicalRowType( Stream.concat(primaryKeys.stream(), partitionKeys.stream()) - .distinct() .collect(Collectors.toList())) .getFields()); bootstrapFields.add( diff --git a/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java index 5abfbfffbc..3d404cde15 100644 --- a/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java @@ -37,14 +37,13 @@ public class KeyPartPartitionKeyExtractor implements PartitionKeyExtractor<Inter private final Projection keyProjection; public KeyPartPartitionKeyExtractor(TableSchema schema) { - List<String> primaryKeys = schema.primaryKeys(); List<String> partitionKeys = schema.partitionKeys(); RowType keyPartType = schema.projectedLogicalRowType( - Stream.concat(primaryKeys.stream(), partitionKeys.stream()) + Stream.concat(schema.trimmedPrimaryKeys().stream(), partitionKeys.stream()) .collect(Collectors.toList())); this.partitionProjection = CodeGenUtils.newProjection(keyPartType, partitionKeys); - this.keyProjection = CodeGenUtils.newProjection(keyPartType, primaryKeys); + this.keyProjection = CodeGenUtils.newProjection(keyPartType, schema.primaryKeys()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowPartitionAllPrimaryKeyExtractor.java similarity index 54% copy from paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java copy to paimon-core/src/main/java/org/apache/paimon/table/sink/RowPartitionAllPrimaryKeyExtractor.java index 5abfbfffbc..3dceaf8253 100644 --- a/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowPartitionAllPrimaryKeyExtractor.java @@ -16,35 +16,30 @@ * limitations under the License. */ -package org.apache.paimon.crosspartition; +package org.apache.paimon.table.sink; import org.apache.paimon.codegen.CodeGenUtils; import org.apache.paimon.codegen.Projection; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.sink.PartitionKeyExtractor; -import org.apache.paimon.types.RowType; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** A {@link PartitionKeyExtractor} to {@link InternalRow} with only key and partition fields. */ -public class KeyPartPartitionKeyExtractor implements PartitionKeyExtractor<InternalRow> { +/** + * A {@link PartitionKeyExtractor} to {@link InternalRow}, the `trimmedPrimaryKey` would return all + * primary keys. + */ +public class RowPartitionAllPrimaryKeyExtractor implements PartitionKeyExtractor<InternalRow> { private final Projection partitionProjection; - private final Projection keyProjection; - - public KeyPartPartitionKeyExtractor(TableSchema schema) { - List<String> primaryKeys = schema.primaryKeys(); - List<String> partitionKeys = schema.partitionKeys(); - RowType keyPartType = - schema.projectedLogicalRowType( - Stream.concat(primaryKeys.stream(), partitionKeys.stream()) - .collect(Collectors.toList())); - this.partitionProjection = CodeGenUtils.newProjection(keyPartType, partitionKeys); - this.keyProjection = CodeGenUtils.newProjection(keyPartType, primaryKeys); + private final Projection primaryKeyProjection; + + public RowPartitionAllPrimaryKeyExtractor(TableSchema schema) { + partitionProjection = + CodeGenUtils.newProjection( + schema.logicalRowType(), schema.projection(schema.partitionKeys())); + primaryKeyProjection = + CodeGenUtils.newProjection( + schema.logicalRowType(), schema.projection(schema.primaryKeys())); } @Override @@ -54,6 +49,6 @@ public class KeyPartPartitionKeyExtractor implements PartitionKeyExtractor<Inter @Override public BinaryRow trimmedPrimaryKey(InternalRow record) { - return keyProjection.apply(record); + return primaryKeyProjection.apply(record); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java index 27fa311ddb..c9a4dacb90 100644 --- a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java @@ -68,7 +68,7 @@ public class IndexBootstrapTest extends TableTestBase { row(3, 6, 6, 7), row(3, 7, 7, 8)); - IndexBootstrap indexBootstrap = new IndexBootstrap(table); + IndexBootstrap indexBootstrap = new IndexBootstrap((FileStoreTable) table); List<GenericRow> result = new ArrayList<>(); Consumer<InternalRow> consumer = row -> result.add(GenericRow.of(row.getInt(0), row.getInt(1), row.getInt(2))); diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala index b7e0b35dd1..e6e17d4848 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala @@ -608,4 +608,27 @@ abstract class InsertOverwriteTableTestBase extends PaimonSparkTestBase { } } } + + test("Paimon Insert: dynamic insert into table with partition columns contain primary key") { + withSQLConf("spark.sql.shuffle.partitions" -> "10") { + withTable("pk_pt") { + sql(""" + |create table pk_pt (c1 int) partitioned by(p1 string, p2 string) + |tblproperties('primary-key'='c1, p1') + |""".stripMargin) + + sql("insert into table pk_pt partition(p1, p2) values(1, 'a', 'b'), (1, 'b', 'b')") + checkAnswer( + sql("select * from pk_pt"), + Seq(Row(1, "a", "b"), Row(1, "b", "b")) + ) + + sql("insert into table pk_pt partition(p1, p2) values(1, 'a', 'b'), (1, 'c', 'c')") + checkAnswer( + sql("select * from pk_pt order by c1, p1, p2"), + Seq(Row(1, "a", "b"), Row(1, "b", "b"), Row(1, "c", "c")) + ) + } + } + } }
