This is an automated email from the ASF dual-hosted git repository. suxiaogang223 pushed a commit to branch codex/paimon-jni-write in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6bff4f26319d7f7028b110f7196cc9143606c7bc Author: suxiaogang <[email protected]> AuthorDate: Fri Jul 3 16:52:18 2026 +0800 [test](regression) Add Spark comparison for Paimon write regression ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Paimon write regression cases compared results through fixed Doris output checks, which made Spark-side consistency coverage harder to maintain. This change switches the Paimon write regression case to the Spark/Doris result comparison helpers, enables Paimon external table insert routing through Nereids, and passes full sink output column names to the BE Paimon JNI writer so partial inserts can write rows with default-filled columns correctly. ### Release note None ### Check List (For Author) - Test: Regression test - `./run-regression-test.sh --run -d external_table_p0/paimon/write -s test_paimon_write_basic` - Behavior changed: No - Does this need documentation: No --- .../writer/paimon/vpaimon_jni_table_writer.cpp | 29 ++++ .../nereids/analyzer/UnboundTableSinkCreator.java | 3 + .../trees/plans/commands/insert/InsertUtils.java | 3 + .../org/apache/doris/planner/PaimonTableSink.java | 16 +++ .../paimon/write/test_paimon_write_basic.groovy | 155 +++++++-------------- 5 files changed, 101 insertions(+), 105 deletions(-) diff --git a/be/src/exec/sink/writer/paimon/vpaimon_jni_table_writer.cpp b/be/src/exec/sink/writer/paimon/vpaimon_jni_table_writer.cpp index 76409281944..78fc5965421 100644 --- a/be/src/exec/sink/writer/paimon/vpaimon_jni_table_writer.cpp +++ b/be/src/exec/sink/writer/paimon/vpaimon_jni_table_writer.cpp @@ -23,6 +23,7 @@ #include <arrow/record_batch.h> #include <arrow/type.h> +#include "common/check.h" #include "core/block/block.h" #include "format/arrow/arrow_block_convertor.h" #include "format/arrow/arrow_row_batch.h" @@ -36,6 +37,23 @@ namespace doris { const std::string PAIMON_JNI_CLASS = "org/apache/doris/paimon/PaimonJniWriter"; +const std::string PAIMON_OUTPUT_COLUMN_NAMES = "doris.output_column_names"; +const char PAIMON_COLUMN_NAME_SEPARATOR = '\x01'; + +std::vector<std::string> split_paimon_output_column_names(const std::string& column_names) { + std::vector<std::string> names; + size_t begin = 0; + while (begin <= column_names.size()) { + size_t end = column_names.find(PAIMON_COLUMN_NAME_SEPARATOR, begin); + if (end == std::string::npos) { + names.emplace_back(column_names.substr(begin)); + break; + } + names.emplace_back(column_names.substr(begin, end - begin)); + begin = end + 1; + } + return names; +} VPaimonJniTableWriter::VPaimonJniTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs) @@ -212,6 +230,17 @@ Status VPaimonJniTableWriter::write(RuntimeState* state, ::doris::Block& block) SCOPED_TIMER(_project_timer); RETURN_IF_ERROR(_projection_block(block, &output_block)); } + const auto& paimon_sink = _t_sink.paimon_table_sink; + if (paimon_sink.__isset.paimon_options) { + auto column_names = paimon_sink.paimon_options.find(PAIMON_OUTPUT_COLUMN_NAMES); + if (column_names != paimon_sink.paimon_options.end()) { + auto output_column_names = split_paimon_output_column_names(column_names->second); + DORIS_CHECK(output_column_names.size() == output_block.columns()); + for (size_t i = 0; i < output_column_names.size(); ++i) { + output_block.get_by_position(i).name = output_column_names[i]; + } + } + } if (output_block.rows() >= _batch_max_rows || output_block.bytes() >= _batch_max_bytes) { RETURN_IF_ERROR(_flush_buffer()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java index 31e291be216..6809bbdcfce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java @@ -149,6 +149,9 @@ public class UnboundTableSinkCreator { } else if (curCatalog instanceof IcebergExternalCatalog && !isAutoDetectPartition) { return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, dmlCommandType, Optional.empty(), Optional.empty(), plan, staticPartitionKeyValues, false); + } else if (curCatalog instanceof PaimonExternalCatalog && !isAutoDetectPartition) { + return new UnboundPaimonTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); } else if (curCatalog instanceof MaxComputeExternalCatalog && !isAutoDetectPartition) { return new UnboundMaxComputeTableSink<>(nameParts, colNames, hints, partitions, dmlCommandType, Optional.empty(), Optional.empty(), plan, staticPartitionKeyValues); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index fa5e34046d1..4400e0b00c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -42,6 +42,7 @@ import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; import org.apache.doris.nereids.analyzer.UnboundInlineTable; import org.apache.doris.nereids.analyzer.UnboundMaxComputeTableSink; +import org.apache.doris.nereids.analyzer.UnboundPaimonTableSink; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundStar; import org.apache.doris.nereids.analyzer.UnboundTableSink; @@ -600,6 +601,8 @@ public class InsertUtils { unboundTableSink = (UnboundHiveTableSink<? extends Plan>) plan; } else if (plan instanceof UnboundIcebergTableSink) { unboundTableSink = (UnboundIcebergTableSink<? extends Plan>) plan; + } else if (plan instanceof UnboundPaimonTableSink) { + unboundTableSink = (UnboundPaimonTableSink<? extends Plan>) plan; } else if (plan instanceof UnboundDictionarySink) { unboundTableSink = (UnboundDictionarySink<? extends Plan>) plan; } else if (plan instanceof UnboundBlackholeSink) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PaimonTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PaimonTableSink.java index 581813e7ed8..bec43606d18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PaimonTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PaimonTableSink.java @@ -56,6 +56,8 @@ import java.util.Set; */ public class PaimonTableSink extends BaseExternalTableDataSink { private static final Logger LOG = LogManager.getLogger(PaimonTableSink.class); + private static final String OUTPUT_COLUMN_NAMES = "doris.output_column_names"; + private static final String COLUMN_NAME_SEPARATOR = "\u0001"; private final PaimonExternalTable targetTable; private static final Base64.Encoder BASE64_ENCODER = java.util.Base64.getUrlEncoder().withoutPadding(); private List<Expr> outputExprs; @@ -123,6 +125,7 @@ public class PaimonTableSink extends BaseExternalTableDataSink { paimonOptions.put("doris.commit_user", ctx.getCommitUser()); } } + paimonOptions.put(OUTPUT_COLUMN_NAMES, String.join(COLUMN_NAME_SEPARATOR, outputColumnNames())); if (ConnectContext.get() != null) { String hadoopUser = hadoopConfig.get("hadoop.username"); @@ -261,4 +264,17 @@ public class PaimonTableSink extends BaseExternalTableDataSink { options.put(key, value); } } + + private List<String> outputColumnNames() throws AnalysisException { + List<Column> fullSchema = targetTable.getFullSchema(); + if (fullSchema.size() != outputExprs.size()) { + throw new AnalysisException("Paimon sink output column size mismatch, schema size=" + + fullSchema.size() + ", output expr size=" + outputExprs.size()); + } + ArrayList<String> outputColumnNames = new ArrayList<>(fullSchema.size()); + for (Column column : fullSchema) { + outputColumnNames.add(column.getName()); + } + return outputColumnNames; + } } diff --git a/regression-test/suites/external_table_p0/paimon/write/test_paimon_write_basic.groovy b/regression-test/suites/external_table_p0/paimon/write/test_paimon_write_basic.groovy index 8726c0ddc02..abe21b04af4 100644 --- a/regression-test/suites/external_table_p0/paimon/write/test_paimon_write_basic.groovy +++ b/regression-test/suites/external_table_p0/paimon/write/test_paimon_write_basic.groovy @@ -26,9 +26,34 @@ suite("test_paimon_write_basic", "basic,external") { String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") String catalogName = "test_paimon_write_basic" String dbName = "paimon_write_basic" + def tableNames = [ + "append_basic", + "append_partitioned", + "append_partial", + "append_from_select", + "fixed_default_bucket", + "fixed_mod_bucket", + "pk_fixed_bucket", + "pk_partial_reject", + "dynamic_bucket_reject", + "insert_overwrite_reject" + ] + String dropPaimonTables = tableNames.collect { + "DROP TABLE IF EXISTS paimon.${dbName}.${it};" + }.join("\n") - def sparkSql = { String sqlText -> - spark_paimon sqlText, 180 + def assertPaimonDorisQueryEquals = { String tableName, String selectList, String orderBy -> + def sparkRows = spark_paimon """ + SELECT ${selectList} + FROM paimon.${dbName}.${tableName} + ORDER BY ${orderBy} + """ + def dorisRows = sql """ + SELECT ${selectList} + FROM ${tableName} + ORDER BY ${orderBy} + """ + assertSparkDorisResultEquals(sparkRows, dorisRows) } sql """drop catalog if exists ${catalogName}""" @@ -45,24 +70,12 @@ suite("test_paimon_write_basic", "basic,external") { """ try { - sparkSql """CREATE DATABASE IF NOT EXISTS paimon.${dbName}""" - [ - "append_basic", - "append_partitioned", - "append_partial", - "append_from_select", - "fixed_default_bucket", - "fixed_mod_bucket", - "pk_fixed_bucket", - "pk_partial_reject", - "dynamic_bucket_reject", - "insert_overwrite_reject" - ].each { tableName -> - sparkSql """DROP TABLE IF EXISTS paimon.${dbName}.${tableName}""" - } + spark_paimon_multi """ + CREATE DATABASE IF NOT EXISTS paimon.${dbName}; + ${dropPaimonTables} + """ - // Case 1: append-only bucket-unaware table, covering full-row INSERT VALUES and basic scalar types. - sparkSql """ + spark_paimon_multi """ CREATE TABLE paimon.${dbName}.append_basic ( id INT, name STRING, @@ -74,11 +87,8 @@ suite("test_paimon_write_basic", "basic,external") { ) USING paimon TBLPROPERTIES ( 'bucket' = '-1' - ) - """ + ); - // Case 2: append-only partitioned table, covering dynamic partition routing during write. - sparkSql """ CREATE TABLE paimon.${dbName}.append_partitioned ( id INT, name STRING, @@ -88,11 +98,8 @@ suite("test_paimon_write_basic", "basic,external") { PARTITIONED BY (pt) TBLPROPERTIES ( 'bucket' = '-1' - ) - """ + ); - // Case 3: append-only table used to verify partial column INSERT. - sparkSql """ CREATE TABLE paimon.${dbName}.append_partial ( id INT, name STRING, @@ -101,11 +108,8 @@ suite("test_paimon_write_basic", "basic,external") { ) USING paimon TBLPROPERTIES ( 'bucket' = '-1' - ) - """ + ); - // Case 4: append-only table used to verify INSERT INTO SELECT, not only INSERT VALUES. - sparkSql """ CREATE TABLE paimon.${dbName}.append_from_select ( id INT, name STRING, @@ -114,11 +118,8 @@ suite("test_paimon_write_basic", "basic,external") { ) USING paimon TBLPROPERTIES ( 'bucket' = '-1' - ) - """ + ); - // Case 5: fixed bucket table using Paimon's default bucket function. - sparkSql """ CREATE TABLE paimon.${dbName}.fixed_default_bucket ( id INT, name STRING, @@ -129,11 +130,8 @@ suite("test_paimon_write_basic", "basic,external") { TBLPROPERTIES ( 'bucket' = '4', 'bucket-key' = 'id' - ) - """ + ); - // Case 6: fixed bucket table using Paimon's MOD bucket function. - sparkSql """ CREATE TABLE paimon.${dbName}.fixed_mod_bucket ( id BIGINT, name STRING, @@ -145,11 +143,8 @@ suite("test_paimon_write_basic", "basic,external") { 'bucket' = '4', 'bucket-key' = 'id', 'bucket-function.type' = 'MOD' - ) - """ + ); - // Case 7: primary-key fixed bucket table, covering full-row upsert semantics. - sparkSql """ CREATE TABLE paimon.${dbName}.pk_fixed_bucket ( id INT, name STRING, @@ -161,11 +156,8 @@ suite("test_paimon_write_basic", "basic,external") { 'primary-key' = 'pt,id', 'bucket' = '4', 'bucket-key' = 'id' - ) - """ + ); - // Case 8: primary-key table used to verify partial column writes are rejected. - sparkSql """ CREATE TABLE paimon.${dbName}.pk_partial_reject ( id INT, name STRING, @@ -175,11 +167,8 @@ suite("test_paimon_write_basic", "basic,external") { 'primary-key' = 'id', 'bucket' = '2', 'bucket-key' = 'id' - ) - """ + ); - // Case 9: dynamic bucket primary-key table used to verify unsupported bucket modes are rejected. - sparkSql """ CREATE TABLE paimon.${dbName}.dynamic_bucket_reject ( id INT, name STRING, @@ -188,18 +177,15 @@ suite("test_paimon_write_basic", "basic,external") { TBLPROPERTIES ( 'primary-key' = 'id', 'bucket' = '-1' - ) - """ + ); - // Case 10: append-only table used to verify Paimon INSERT OVERWRITE is still unsupported. - sparkSql """ CREATE TABLE paimon.${dbName}.insert_overwrite_reject ( id INT, name STRING ) USING paimon TBLPROPERTIES ( 'bucket' = '-1' - ) + ); """ sql """refresh catalog ${catalogName}""" @@ -212,11 +198,7 @@ suite("test_paimon_write_basic", "basic,external") { (2, 'bob', 200, false, DATE '2026-01-02', TIMESTAMP '2026-01-02 11:30:00', 20.75), (3, 'carol', NULL, true, NULL, NULL, NULL) """ - order_qt_append_basic """ - SELECT id, name, score, flag, dt, ts, amount - FROM append_basic - ORDER BY id - """ + assertPaimonDorisQueryEquals("append_basic", "id, name, score, flag, dt, ts, amount", "id") sql """ INSERT INTO append_partitioned VALUES @@ -225,22 +207,14 @@ suite("test_paimon_write_basic", "basic,external") { (3, 'c', 30, 'p2'), (4, 'd', 40, 'p2') """ - order_qt_append_partitioned """ - SELECT pt, id, name, score - FROM append_partitioned - ORDER BY pt, id - """ + assertPaimonDorisQueryEquals("append_partitioned", "pt, id, name, score", "pt, id") sql """ INSERT INTO append_partial(id, name) VALUES (1, 'partial-a'), (2, 'partial-b') """ - order_qt_append_partial """ - SELECT id, name, score, note - FROM append_partial - ORDER BY id - """ + assertPaimonDorisQueryEquals("append_partial", "id, name, score, note", "id") sql """ INSERT INTO append_from_select @@ -248,11 +222,7 @@ suite("test_paimon_write_basic", "basic,external") { FROM append_partitioned WHERE pt = 'p1' """ - order_qt_append_from_select """ - SELECT pt, id, name, score - FROM append_from_select - ORDER BY pt, id - """ + assertPaimonDorisQueryEquals("append_from_select", "pt, id, name, score", "pt, id") sql """ INSERT INTO fixed_default_bucket VALUES @@ -262,11 +232,7 @@ suite("test_paimon_write_basic", "basic,external") { (4, 'default-d', 40, 'p2'), (-5, 'default-negative', 50, 'p2') """ - order_qt_fixed_default_bucket """ - SELECT pt, id, name, score - FROM fixed_default_bucket - ORDER BY pt, id - """ + assertPaimonDorisQueryEquals("fixed_default_bucket", "pt, id, name, score", "pt, id") sql """ INSERT INTO fixed_mod_bucket VALUES @@ -275,11 +241,7 @@ suite("test_paimon_write_basic", "basic,external") { (5, 'mod-b', 30, 'p2'), (-6, 'mod-negative-b', 40, 'p2') """ - order_qt_fixed_mod_bucket """ - SELECT pt, id, name, score - FROM fixed_mod_bucket - ORDER BY pt, id - """ + assertPaimonDorisQueryEquals("fixed_mod_bucket", "pt, id, name, score", "pt, id") sql """ INSERT INTO pk_fixed_bucket VALUES @@ -291,11 +253,7 @@ suite("test_paimon_write_basic", "basic,external") { INSERT INTO pk_fixed_bucket VALUES (1, 'pk-a-updated', 30, 'p1') """ - order_qt_pk_fixed_bucket """ - SELECT pt, id, name, score - FROM pk_fixed_bucket - ORDER BY pt, id - """ + assertPaimonDorisQueryEquals("pk_fixed_bucket", "pt, id, name, score", "pt, id") test { sql """ @@ -322,20 +280,7 @@ suite("test_paimon_write_basic", "basic,external") { } } finally { try { - [ - "append_basic", - "append_partitioned", - "append_partial", - "append_from_select", - "fixed_default_bucket", - "fixed_mod_bucket", - "pk_fixed_bucket", - "pk_partial_reject", - "dynamic_bucket_reject", - "insert_overwrite_reject" - ].each { tableName -> - sparkSql """DROP TABLE IF EXISTS paimon.${dbName}.${tableName}""" - } + spark_paimon_multi dropPaimonTables } catch (Exception e) { logger.warn("Failed to drop Paimon write basic test tables: ${e.message}".toString()) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
