This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 60af658fd1f [fix](BrokerLoad && RoutineLoad) Fix the load attributes
and table attributes conflict check (#58054)
60af658fd1f is described below
commit 60af658fd1f43c4bb95c76fc79791b01140dba9c
Author: Refrain <[email protected]>
AuthorDate: Wed Dec 10 11:26:49 2025 +0800
[fix](BrokerLoad && RoutineLoad) Fix the load attributes and table
attributes conflict check (#58054)
In a Routine Load job, if the `load_to_single_tablet `property is set
for a table with a `hash distribution` scheme, the load job should be
canceled.
Similarly, in a Broker Load job with the same configuration, the job
should also fail.
---
contrib/faiss | 2 +-
.../load/routineload/KafkaRoutineLoadJob.java | 7 +
.../doris/nereids/load/NereidsBrokerLoadTask.java | 7 +-
.../nereids/load/NereidsLoadingTaskPlanner.java | 2 +-
.../nereids/trees/plans/commands/LoadCommand.java | 11 ++
.../plans/commands/insert/OlapInsertExecutor.java | 2 +
.../org/apache/doris/planner/OlapTableSink.java | 5 +
.../load_ddl/test_load_single_tablet.groovy | 156 +++++++++++++++++++++
.../routine_load/ddl/dup_tbl_basic_create.sql | 2 +-
9 files changed, 189 insertions(+), 5 deletions(-)
diff --git a/contrib/faiss b/contrib/faiss
index 0276f0bb7e4..eb245ae0bff 160000
--- a/contrib/faiss
+++ b/contrib/faiss
@@ -1 +1 @@
-Subproject commit 0276f0bb7e4fc0694fe44f450f4f5cfbcb4dd5cc
+Subproject commit eb245ae0bffb7f871c08c2f2db6ca9d01311a9f4
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 9324ca4a87c..7fccb0ae0e1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.Config;
@@ -511,6 +512,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
} else {
OlapTable olapTable =
db.getOlapTableOrDdlException(info.getTableName());
checkMeta(olapTable, info.getRoutineLoadDesc());
+ // check load_to_single_tablet compatibility with distribution type
+ if (info.isLoadToSingleTablet()
+ && !(olapTable.getDefaultDistributionInfo() instanceof
RandomDistributionInfo)) {
+ throw new DdlException(
+ "if load_to_single_tablet set to true, the olap table
must be with random distribution");
+ }
long tableId = olapTable.getId();
// init kafka routine load job
kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, info.getName(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
index 6a48ac75114..504a8173539 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
@@ -36,18 +36,21 @@ public class NereidsBrokerLoadTask implements
NereidsLoadTaskInfo {
private int sendBatchParallelism;
private boolean strictMode;
private boolean memtableOnSinkNode;
+ private boolean loadToSingleTablet;
private PartitionNamesInfo partitionNamesInfo;
/**
* NereidsBrokerLoadTask
*/
public NereidsBrokerLoadTask(long txnId, int timeout, int
sendBatchParallelism,
- boolean strictMode, boolean memtableOnSinkNode, PartitionNamesInfo
partitions) {
+ boolean strictMode, boolean memtableOnSinkNode, boolean
loadToSingleTablet,
+ PartitionNamesInfo partitions) {
this.txnId = txnId;
this.timeout = timeout;
this.sendBatchParallelism = sendBatchParallelism;
this.strictMode = strictMode;
this.memtableOnSinkNode = memtableOnSinkNode;
+ this.loadToSingleTablet = loadToSingleTablet;
this.partitionNamesInfo = partitions;
}
@@ -173,7 +176,7 @@ public class NereidsBrokerLoadTask implements
NereidsLoadTaskInfo {
@Override
public boolean isLoadToSingleTablet() {
- return false;
+ return loadToSingleTablet;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
index 13156d771e8..d8692ac24e9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
@@ -162,7 +162,7 @@ public class NereidsLoadingTaskPlanner {
}
NereidsBrokerLoadTask nereidsBrokerLoadTask = new
NereidsBrokerLoadTask(txnId, (int) txnTimeout,
sendBatchParallelism,
- strictMode, enableMemtableOnSinkNode, partitionNamesInfo);
+ strictMode, enableMemtableOnSinkNode, singleTabletLoadPerSink,
partitionNamesInfo);
TupleDescriptor scanTupleDesc = descTable.createTupleDescriptor();
scanTupleDesc.setTable(table);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
index aae70d34376..117f0dd3bfb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
@@ -416,6 +417,16 @@ public class LoadCommand extends Command implements
NeedAuditEncryption, Forward
if (dataDescription.getMergeType() != LoadTask.MergeType.APPEND &&
!table.hasDeleteSign()) {
throw new AnalysisException("load by MERGE or DELETE need to
upgrade table to support batch delete.");
}
+ if (properties != null) {
+ String loadToSingleTablet =
properties.get(LOAD_TO_SINGLE_TABLET);
+ if (loadToSingleTablet != null &&
loadToSingleTablet.equalsIgnoreCase("true")) {
+ if (!(table.getDefaultDistributionInfo() instanceof
RandomDistributionInfo)) {
+ throw new AnalysisException(
+ "if load_to_single_tablet set to true, "
+ + "the olap table must be with random
distribution");
+ }
+ }
+ }
if (brokerDesc != null && !brokerDesc.isMultiLoadBroker()) {
for (int i = 0; i < dataDescription.getFilePaths().size();
i++) {
String location =
brokerDesc.getFileLocation(dataDescription.getFilePaths().get(i));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 453e102aa72..550844bd3ec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -135,6 +135,8 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
try {
// TODO refactor this to avoid call legacy planner's function
long timeout = getTimeout();
+ // TODO: For Insert Into with S3/HDFS TVF, need to get
load_to_single_tablet from TVF properties
+ // Currently hardcoded to false, which bypasses the check in
OlapTableSink.init()
olapTableSink.init(ctx.queryId(), txnId, database.getId(),
timeout,
ctx.getSessionVariable().getSendBatchParallelism(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 3e51feb703c..421bc131d6d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -160,6 +160,11 @@ public class OlapTableSink extends DataSink {
: false);
this.isStrictMode = isStrictMode;
this.txnId = txnId;
+ // (Note) This code may need to be removed because:
+ // 1. For broker load and routine load, this check has already been
moved ahead to the stage
+ // before the import task is started.
+ // 2. For other import methods, the loadToSingleTablet mode is
actually not enabled,
+ // as it is hardcoded as false in the code.
if (loadToSingleTablet && !(dstTable.getDefaultDistributionInfo()
instanceof RandomDistributionInfo)) {
throw new AnalysisException(
"if load_to_single_tablet set to true," + " the olap table
must be with random distribution");
diff --git
a/regression-test/suites/load_p0/load_ddl/test_load_single_tablet.groovy
b/regression-test/suites/load_p0/load_ddl/test_load_single_tablet.groovy
new file mode 100644
index 00000000000..c31ccf1b369
--- /dev/null
+++ b/regression-test/suites/load_p0/load_ddl/test_load_single_tablet.groovy
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_load_single_tablet", "p0") {
+ def tableName = "test_load_single_tablet_table"
+ def s3BucketName = getS3BucketName()
+ def s3Endpoint = getS3Endpoint()
+ def s3Region = getS3Region()
+ def ak = getS3AK()
+ def sk = getS3SK()
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+
+ sql """
+ CREATE TABLE ${tableName} (
+ user_id BIGINT NOT NULL COMMENT "user id",
+ name VARCHAR(20) COMMENT "name",
+ age INT COMMENT "age"
+ )
+ UNIQUE KEY(user_id)
+ DISTRIBUTED BY HASH(user_id) BUCKETS 10
+ PROPERTIES("replication_num" = "1");
+ """
+
+ // Test Broker Load with load_to_single_tablet on HASH distribution table
(should fail)
+ try {
+ def label1 = "test_load_single_tablet_" + System.currentTimeMillis()
+ sql """
+ LOAD LABEL ${label1} (
+ DATA INFILE("s3://${s3BucketName}/load/jira/load22282.csv")
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY ","
+ FORMAT AS "CSV"
+ (user_id, name, age)
+ )
+ WITH S3 (
+ "AWS_ACCESS_KEY" = "${ak}",
+ "AWS_SECRET_KEY" = "${sk}",
+ "AWS_ENDPOINT" = "${s3Endpoint}",
+ "AWS_REGION" = "${s3Region}"
+ )
+ PROPERTIES (
+ "timeout" = "3600",
+ "load_to_single_tablet" = "true"
+ );
+ """
+
+ // Wait for the load job to be processed and check if it fails
+ def max_try_milli_secs = 60000
+ def foundExpectedError = false
+ while (max_try_milli_secs > 0) {
+ def result = sql "SHOW LOAD WHERE LABEL = '${label1}'"
+ if (result.size() > 0) {
+ def state = result[0][2]
+ def errorMsg = result[0][7]
+ logger.info("Load label: ${label1}, state: ${state}, errorMsg:
${errorMsg}")
+
+ if (state == "CANCELLED") {
+ logger.info("Broker load failed as expected with error:
${errorMsg}")
+ foundExpectedError = true
+ break
+ } else if (state == "FINISHED") {
+ assertTrue(false, "Broker load should fail but succeeded")
+ }
+ }
+
+ Thread.sleep(1000)
+ max_try_milli_secs -= 1000
+ if (max_try_milli_secs <= 0) {
+ assertTrue(false, "test_load_single_tablet broker load
timeout: ${label1}")
+ }
+ }
+
+ assertTrue(foundExpectedError, "Broker load should have failed with
expected error")
+ } catch (Exception e) {
+ // It's also acceptable if the load fails at creation time
+ logger.info("Broker load failed at creation as expected: ${e.message}")
+ assertTrue(e.message.contains("if load_to_single_tablet set to true")
||
+ e.message.contains("the olap table must be with random
distribution"),
+ "Expected error message about random distribution, but got:
${e.message}")
+ }
+
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def kafkaCsvTopic = "test_load_single_tablet_topic"
+
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+
+ def producer = new KafkaProducer<>(props)
+
+ try {
+ def testData = "1,ykiko,20"
+ producer.send(new ProducerRecord<>(kafkaCsvTopic, null, testData))
+ producer.flush()
+ logger.info("Sent test data to Kafka topic ${kafkaCsvTopic}:
${testData}")
+
+ def label2 = "test_load_single_tablet2_" +
System.currentTimeMillis()
+
+ sql """
+ CREATE ROUTINE LOAD ${label2} ON ${tableName}
+ COLUMNS TERMINATED BY ",",
+ COLUMNS(user_id, name, age)
+ PROPERTIES (
+ "load_to_single_tablet" = "true"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTopic}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ assertTrue(false, "Routine load should fail when
load_to_single_tablet=true with HASH distribution table")
+ } catch (Exception e) {
+ logger.info("Routine load failed as expected: ${e.message}")
+ assertTrue(e.message.contains("if load_to_single_tablet set to
true, the olap table must be with random distribution"),
+ "Expected error message: 'if load_to_single_tablet set
to true, the olap table must be with random distribution', but got:
${e.message}")
+ } finally {
+ if (producer != null) {
+ producer.close()
+ }
+ }
+ } else {
+ logger.info("Skip routine load test because enableKafkaTest is not
enabled")
+ }
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+}
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/ddl/dup_tbl_basic_create.sql
b/regression-test/suites/load_p0/routine_load/ddl/dup_tbl_basic_create.sql
index 77cd41901d3..dba4eec1cde 100644
--- a/regression-test/suites/load_p0/routine_load/ddl/dup_tbl_basic_create.sql
+++ b/regression-test/suites/load_p0/routine_load/ddl/dup_tbl_basic_create.sql
@@ -58,7 +58,7 @@ PARTITION BY RANGE(k01)
PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
)
-DISTRIBUTED BY HASH(k00) BUCKETS 32
+DISTRIBUTED BY RANDOM BUCKETS 32
PROPERTIES (
"bloom_filter_columns"="k05",
"replication_num" = "1"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]