This is an automated email from the ASF dual-hosted git repository. junhao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new b65dc1d16b [core] Fix default value not work for primary key table with bucket -1 (#6050) b65dc1d16b is described below commit b65dc1d16b9da8401a17b45f53b183ee2678ac7d Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Mon Aug 11 15:45:36 2025 +0800 [core] Fix default value not work for primary key table with bucket -1 (#6050) --- .../java/org/apache/paimon/table/sink/TableWriteImpl.java | 15 ++++----------- .../java/org/apache/paimon/flink/BranchSqlITCase.java | 9 +++++++++ 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index aba740fe0f..9d3f4e8b51 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -171,26 +171,19 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State @Nullable public SinkRecord writeAndReturn(InternalRow row) throws Exception { - checkNullability(row); - row = wrapDefaultValue(row); - RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); - if (ignoreDelete && rowKind.isRetract()) { - return null; - } - SinkRecord record = toSinkRecord(row); - write.write(record.partition(), record.bucket(), recordExtractor.extract(record, rowKind)); - return record; + return writeAndReturn(row, -1); } @Nullable public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception { checkNullability(row); + row = wrapDefaultValue(row); RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); if (ignoreDelete && rowKind.isRetract()) { return null; } - SinkRecord record = toSinkRecord(row, bucket); - write.write(record.partition(), bucket, recordExtractor.extract(record, rowKind)); + SinkRecord record = bucket == -1 ? toSinkRecord(row) : toSinkRecord(row, bucket); + write.write(record.partition(), record.bucket(), recordExtractor.extract(record, rowKind)); return record; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 2160cb1a2d..dd7750b2f0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -52,6 +52,15 @@ public class BranchSqlITCase extends CatalogITCaseBase { .containsExactlyInAnyOrder("+I[1, 5]", "+I[2, 5]"); } + @Test + public void testDefaultValueForPkTableDynamicBucket() throws Exception { + sql("CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT)"); + sql("CALL sys.alter_column_default_value('default.T', 'b', '5')"); + sql("INSERT INTO T (a) VALUES (1), (2)"); + assertThat(collectResult("SELECT * FROM T")) + .containsExactlyInAnyOrder("+I[1, 5]", "+I[2, 5]"); + } + @Test public void testAlterBranchTable() throws Exception { sql(