This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 60af658fd1f [fix](BrokerLoad && RoutineLoad) Fix the load attributes 
and table attributes conflict check (#58054)
60af658fd1f is described below

commit 60af658fd1f43c4bb95c76fc79791b01140dba9c
Author: Refrain <[email protected]>
AuthorDate: Wed Dec 10 11:26:49 2025 +0800

    [fix](BrokerLoad && RoutineLoad) Fix the load attributes and table 
attributes conflict check (#58054)
    
    In a Routine Load job, if the `load_to_single_tablet `property is set
    for a table with a `hash distribution` scheme, the load job should be
    canceled.
    
    Similarly, in a Broker Load job with the same configuration, the job
    should also fail.
---
 contrib/faiss                                      |   2 +-
 .../load/routineload/KafkaRoutineLoadJob.java      |   7 +
 .../doris/nereids/load/NereidsBrokerLoadTask.java  |   7 +-
 .../nereids/load/NereidsLoadingTaskPlanner.java    |   2 +-
 .../nereids/trees/plans/commands/LoadCommand.java  |  11 ++
 .../plans/commands/insert/OlapInsertExecutor.java  |   2 +
 .../org/apache/doris/planner/OlapTableSink.java    |   5 +
 .../load_ddl/test_load_single_tablet.groovy        | 156 +++++++++++++++++++++
 .../routine_load/ddl/dup_tbl_basic_create.sql      |   2 +-
 9 files changed, 189 insertions(+), 5 deletions(-)

diff --git a/contrib/faiss b/contrib/faiss
index 0276f0bb7e4..eb245ae0bff 160000
--- a/contrib/faiss
+++ b/contrib/faiss
@@ -1 +1 @@
-Subproject commit 0276f0bb7e4fc0694fe44f450f4f5cfbcb4dd5cc
+Subproject commit eb245ae0bffb7f871c08c2f2db6ca9d01311a9f4
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 9324ca4a87c..7fccb0ae0e1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.RandomDistributionInfo;
 import org.apache.doris.cloud.proto.Cloud;
 import org.apache.doris.cloud.rpc.MetaServiceProxy;
 import org.apache.doris.common.Config;
@@ -511,6 +512,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         } else {
             OlapTable olapTable = 
db.getOlapTableOrDdlException(info.getTableName());
             checkMeta(olapTable, info.getRoutineLoadDesc());
+            // check load_to_single_tablet compatibility with distribution type
+            if (info.isLoadToSingleTablet()
+                    && !(olapTable.getDefaultDistributionInfo() instanceof 
RandomDistributionInfo)) {
+                throw new DdlException(
+                        "if load_to_single_tablet set to true, the olap table 
must be with random distribution");
+            }
             long tableId = olapTable.getId();
             // init kafka routine load job
             kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, info.getName(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
index 6a48ac75114..504a8173539 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
@@ -36,18 +36,21 @@ public class NereidsBrokerLoadTask implements 
NereidsLoadTaskInfo {
     private int sendBatchParallelism;
     private boolean strictMode;
     private boolean memtableOnSinkNode;
+    private boolean loadToSingleTablet;
     private PartitionNamesInfo partitionNamesInfo;
 
     /**
      * NereidsBrokerLoadTask
      */
     public NereidsBrokerLoadTask(long txnId, int timeout, int 
sendBatchParallelism,
-            boolean strictMode, boolean memtableOnSinkNode, PartitionNamesInfo 
partitions) {
+            boolean strictMode, boolean memtableOnSinkNode, boolean 
loadToSingleTablet,
+            PartitionNamesInfo partitions) {
         this.txnId = txnId;
         this.timeout = timeout;
         this.sendBatchParallelism = sendBatchParallelism;
         this.strictMode = strictMode;
         this.memtableOnSinkNode = memtableOnSinkNode;
+        this.loadToSingleTablet = loadToSingleTablet;
         this.partitionNamesInfo = partitions;
     }
 
@@ -173,7 +176,7 @@ public class NereidsBrokerLoadTask implements 
NereidsLoadTaskInfo {
 
     @Override
     public boolean isLoadToSingleTablet() {
-        return false;
+        return loadToSingleTablet;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
index 13156d771e8..d8692ac24e9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
@@ -162,7 +162,7 @@ public class NereidsLoadingTaskPlanner {
         }
         NereidsBrokerLoadTask nereidsBrokerLoadTask = new 
NereidsBrokerLoadTask(txnId, (int) txnTimeout,
                 sendBatchParallelism,
-                strictMode, enableMemtableOnSinkNode, partitionNamesInfo);
+                strictMode, enableMemtableOnSinkNode, singleTabletLoadPerSink, 
partitionNamesInfo);
 
         TupleDescriptor scanTupleDesc = descTable.createTupleDescriptor();
         scanTupleDesc.setTable(table);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
index aae70d34376..117f0dd3bfb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.RandomDistributionInfo;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
@@ -416,6 +417,16 @@ public class LoadCommand extends Command implements 
NeedAuditEncryption, Forward
             if (dataDescription.getMergeType() != LoadTask.MergeType.APPEND && 
!table.hasDeleteSign()) {
                 throw new AnalysisException("load by MERGE or DELETE need to 
upgrade table to support batch delete.");
             }
+            if (properties != null) {
+                String loadToSingleTablet = 
properties.get(LOAD_TO_SINGLE_TABLET);
+                if (loadToSingleTablet != null && 
loadToSingleTablet.equalsIgnoreCase("true")) {
+                    if (!(table.getDefaultDistributionInfo() instanceof 
RandomDistributionInfo)) {
+                        throw new AnalysisException(
+                                "if load_to_single_tablet set to true, "
+                                + "the olap table must be with random 
distribution");
+                    }
+                }
+            }
             if (brokerDesc != null && !brokerDesc.isMultiLoadBroker()) {
                 for (int i = 0; i < dataDescription.getFilePaths().size(); 
i++) {
                     String location = 
brokerDesc.getFileLocation(dataDescription.getFilePaths().get(i));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 453e102aa72..550844bd3ec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -135,6 +135,8 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
         try {
             // TODO refactor this to avoid call legacy planner's function
             long timeout = getTimeout();
+            // TODO: For Insert Into with S3/HDFS TVF, need to get 
load_to_single_tablet from TVF properties
+            // Currently hardcoded to false, which bypasses the check in 
OlapTableSink.init()
             olapTableSink.init(ctx.queryId(), txnId, database.getId(),
                     timeout,
                     ctx.getSessionVariable().getSendBatchParallelism(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 3e51feb703c..421bc131d6d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -160,6 +160,11 @@ public class OlapTableSink extends DataSink {
                 : false);
         this.isStrictMode = isStrictMode;
         this.txnId = txnId;
+        // (Note) This code may need to be removed because:
+        // 1. For broker load and routine load, this check has already been 
moved ahead to the stage
+        //    before the import task is started.
+        // 2. For other import methods, the loadToSingleTablet mode is 
actually not enabled,
+        //    as it is hardcoded as false in the code.
         if (loadToSingleTablet && !(dstTable.getDefaultDistributionInfo() 
instanceof RandomDistributionInfo)) {
             throw new AnalysisException(
                     "if load_to_single_tablet set to true," + " the olap table 
must be with random distribution");
diff --git 
a/regression-test/suites/load_p0/load_ddl/test_load_single_tablet.groovy 
b/regression-test/suites/load_p0/load_ddl/test_load_single_tablet.groovy
new file mode 100644
index 00000000000..c31ccf1b369
--- /dev/null
+++ b/regression-test/suites/load_p0/load_ddl/test_load_single_tablet.groovy
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_load_single_tablet", "p0") {
+    def tableName = "test_load_single_tablet_table"
+    def s3BucketName = getS3BucketName()
+    def s3Endpoint = getS3Endpoint()
+    def s3Region = getS3Region()
+    def ak = getS3AK()
+    def sk = getS3SK()
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    
+    sql """
+        CREATE TABLE ${tableName} (
+            user_id            BIGINT       NOT NULL COMMENT "user id",
+            name               VARCHAR(20)           COMMENT "name",
+            age                INT                   COMMENT "age"
+        )
+        UNIQUE KEY(user_id)
+        DISTRIBUTED BY HASH(user_id) BUCKETS 10
+        PROPERTIES("replication_num" = "1");
+        """
+
+    // Test Broker Load with load_to_single_tablet on HASH distribution table 
(should fail)
+    try {
+        def label1 = "test_load_single_tablet_" + System.currentTimeMillis()
+        sql """
+            LOAD LABEL ${label1} (
+                DATA INFILE("s3://${s3BucketName}/load/jira/load22282.csv")
+                INTO TABLE ${tableName}
+                COLUMNS TERMINATED BY ","
+                FORMAT AS "CSV"
+                (user_id, name, age)
+            )
+            WITH S3 (
+                "AWS_ACCESS_KEY" = "${ak}",
+                "AWS_SECRET_KEY" = "${sk}",
+                "AWS_ENDPOINT" = "${s3Endpoint}",
+                "AWS_REGION" = "${s3Region}"
+            )
+            PROPERTIES (
+                "timeout" = "3600",
+                "load_to_single_tablet" = "true"
+            );
+            """
+
+        // Wait for the load job to be processed and check if it fails
+        def max_try_milli_secs = 60000
+        def foundExpectedError = false
+        while (max_try_milli_secs > 0) {
+            def result = sql "SHOW LOAD WHERE LABEL = '${label1}'"
+            if (result.size() > 0) {
+                def state = result[0][2]
+                def errorMsg = result[0][7]
+                logger.info("Load label: ${label1}, state: ${state}, errorMsg: 
${errorMsg}")
+                
+                if (state == "CANCELLED") {
+                    logger.info("Broker load failed as expected with error: 
${errorMsg}")
+                    foundExpectedError = true
+                    break
+                } else if (state == "FINISHED") {
+                    assertTrue(false, "Broker load should fail but succeeded")
+                }
+            }
+            
+            Thread.sleep(1000)
+            max_try_milli_secs -= 1000
+            if (max_try_milli_secs <= 0) {
+                assertTrue(false, "test_load_single_tablet broker load 
timeout: ${label1}")
+            }
+        }
+        
+        assertTrue(foundExpectedError, "Broker load should have failed with 
expected error")
+    } catch (Exception e) {
+        // It's also acceptable if the load fails at creation time
+        logger.info("Broker load failed at creation as expected: ${e.message}")
+        assertTrue(e.message.contains("if load_to_single_tablet set to true") 
|| 
+                  e.message.contains("the olap table must be with random 
distribution"),
+                  "Expected error message about random distribution, but got: 
${e.message}")
+    }
+
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+    
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def kafkaCsvTopic = "test_load_single_tablet_topic"
+        
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+        def producer = new KafkaProducer<>(props)
+        
+        try {
+            def testData = "1,ykiko,20"
+            producer.send(new ProducerRecord<>(kafkaCsvTopic, null, testData))
+            producer.flush()
+            logger.info("Sent test data to Kafka topic ${kafkaCsvTopic}: 
${testData}")
+            
+            def label2 = "test_load_single_tablet2_" + 
System.currentTimeMillis()
+            
+            sql """
+                CREATE ROUTINE LOAD ${label2} ON ${tableName}
+                COLUMNS TERMINATED BY ",",
+                COLUMNS(user_id, name, age)
+                PROPERTIES (
+                    "load_to_single_tablet" = "true"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTopic}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            
+            assertTrue(false, "Routine load should fail when 
load_to_single_tablet=true with HASH distribution table")
+        } catch (Exception e) {
+            logger.info("Routine load failed as expected: ${e.message}")
+            assertTrue(e.message.contains("if load_to_single_tablet set to 
true, the olap table must be with random distribution"),
+                      "Expected error message: 'if load_to_single_tablet set 
to true, the olap table must be with random distribution', but got: 
${e.message}")
+        } finally {
+            if (producer != null) {
+                producer.close()
+            }
+        }
+    } else {
+        logger.info("Skip routine load test because enableKafkaTest is not 
enabled")
+    }
+    
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/ddl/dup_tbl_basic_create.sql 
b/regression-test/suites/load_p0/routine_load/ddl/dup_tbl_basic_create.sql
index 77cd41901d3..dba4eec1cde 100644
--- a/regression-test/suites/load_p0/routine_load/ddl/dup_tbl_basic_create.sql
+++ b/regression-test/suites/load_p0/routine_load/ddl/dup_tbl_basic_create.sql
@@ -58,7 +58,7 @@ PARTITION BY RANGE(k01)
     PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
     PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
 )
-DISTRIBUTED BY HASH(k00) BUCKETS 32
+DISTRIBUTED BY RANDOM BUCKETS 32
 PROPERTIES (
     "bloom_filter_columns"="k05",
     "replication_num" = "1"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to