This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 72febaaf07b branch-3.1: [bug](auto partition) Fix be crash with single
replica insert #48101 (#54120)
72febaaf07b is described below
commit 72febaaf07b7d121005acc3594e881ccb290f1ea
Author: hui lai <[email protected]>
AuthorDate: Thu Jul 31 11:29:26 2025 +0800
branch-3.1: [bug](auto partition) Fix be crash with single replica insert
#48101 (#54120)
pick #48101
Co-authored-by: xy720 <[email protected]>
---
be/src/runtime/tablets_channel.cpp | 14 ++--
be/src/vec/sink/vrow_distribution.cpp | 3 +
be/src/vec/sink/vrow_distribution.h | 3 +
be/src/vec/sink/writer/vtablet_writer.cpp | 11 +--
.../apache/doris/service/FrontendServiceImpl.java | 35 ++++++++-
gensrc/thrift/FrontendService.thrift | 4 ++
...t_auto_partition_with_single_replica_insert.csv | 78 +++++++++++++++++++++
...t_auto_partition_with_single_replica_insert.out | Bin 0 -> 7985 bytes
...uto_partition_with_single_replica_insert.groovy | 78 +++++++++++++++++++++
9 files changed, 215 insertions(+), 11 deletions(-)
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 3879b664ed8..c20c3394a2c 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -370,8 +370,6 @@ Status TabletsChannel::close(LoadChannel* parent, const
PTabletWriterAddBlockReq
// 5. commit all writers
for (auto* writer : need_wait_writers) {
- PSlaveTabletNodes slave_nodes;
-
// close may return failed, but no need to handle it here.
// tablet_vec will only contains success tablet, and then let FE judge
it.
_commit_txn(writer, req, res);
@@ -408,9 +406,15 @@ Status TabletsChannel::close(LoadChannel* parent, const
PTabletWriterAddBlockReq
void TabletsChannel::_commit_txn(DeltaWriter* writer, const
PTabletWriterAddBlockRequest& req,
PTabletWriterAddBlockResult* res) {
- Status st = writer->commit_txn(_write_single_replica
- ?
req.slave_tablet_nodes().at(writer->tablet_id())
- : PSlaveTabletNodes {});
+ PSlaveTabletNodes slave_nodes;
+ if (_write_single_replica) {
+ auto& nodes_map = req.slave_tablet_nodes();
+ auto it = nodes_map.find(writer->tablet_id());
+ if (it != nodes_map.end()) {
+ slave_nodes = it->second;
+ }
+ }
+ Status st = writer->commit_txn(slave_nodes);
if (st.ok()) [[likely]] {
auto* tablet_vec = res->mutable_tablet_vec();
PTabletInfo* tablet_info = tablet_vec->Add();
diff --git a/be/src/vec/sink/vrow_distribution.cpp
b/be/src/vec/sink/vrow_distribution.cpp
index 34ef9565dba..1f0bdd96b85 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -100,6 +100,7 @@ Status VRowDistribution::automatic_create_partition() {
request.__set_table_id(_vpartition->table_id());
request.__set_partitionValues(_partitions_need_create);
request.__set_be_endpoint(be_endpoint);
+ request.__set_write_single_replica(_write_single_replica);
VLOG_NOTICE << "automatic partition rpc begin request " << request;
TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
@@ -133,6 +134,7 @@ static TCreatePartitionResult
cast_as_create_result(TReplacePartitionResult& arg
result.nodes = std::move(arg.nodes);
result.partitions = std::move(arg.partitions);
result.tablets = std::move(arg.tablets);
+ result.slave_tablets = std::move(arg.slave_tablets);
return result;
}
@@ -144,6 +146,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id());
request.__set_db_id(_vpartition->db_id());
request.__set_table_id(_vpartition->table_id());
+ request.__set_write_single_replica(_write_single_replica);
// only request for partitions not recorded for replacement
std::set<int64_t> id_deduper;
diff --git a/be/src/vec/sink/vrow_distribution.h
b/be/src/vec/sink/vrow_distribution.h
index 9e4cce6b528..429b67bb068 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -79,6 +79,7 @@ public:
const VExprContextSPtrs* vec_output_expr_ctxs = nullptr;
std::shared_ptr<OlapTableSchemaParam> schema;
void* caller = nullptr;
+ bool write_single_replica = false;
CreatePartitionCallback create_partition_callback;
};
friend class VTabletWriter;
@@ -100,6 +101,7 @@ public:
_vec_output_expr_ctxs = ctx.vec_output_expr_ctxs;
_schema = ctx.schema;
_caller = ctx.caller;
+ _write_single_replica = ctx.write_single_replica;
_create_partition_callback = ctx.create_partition_callback;
}
@@ -219,6 +221,7 @@ private:
CreatePartitionCallback _create_partition_callback = nullptr;
void* _caller = nullptr;
std::shared_ptr<OlapTableSchemaParam> _schema;
+ bool _write_single_replica = false;
// reuse for find_tablet. save partitions found by find_tablets
std::vector<VOlapTablePartition*> _partitions;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index d14d603f6fa..f6d8b844b18 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -126,8 +126,8 @@ Status IndexChannel::init(RuntimeState* state, const
std::vector<TTabletWithPart
if (incremental) {
_has_inc_node = true;
}
- VLOG_CRITICAL << "init new node for instance " <<
_parent->_sender_id
- << ", incremantal:" << incremental;
+ LOG(INFO) << "init new node for instance " <<
_parent->_sender_id
+ << ", node id:" << replica_node_id << ",
incremantal:" << incremental;
} else {
channel = it->second;
}
@@ -1244,7 +1244,8 @@ void VNodeChannel::mark_close(bool hang_wait) {
DCHECK(_pending_blocks.back().second->eos());
_close_time_ms = UnixMillis();
LOG(INFO) << channel_info()
- << " mark closed, left pending batch size: " <<
_pending_blocks.size();
+ << " mark closed, left pending batch size: " <<
_pending_blocks.size()
+ << " hang_wait: " << hang_wait;
}
_eos_is_produced = true;
@@ -1367,7 +1368,8 @@ Status
VTabletWriter::on_partitions_created(TCreatePartitionResult* result) {
auto* new_locations = _pool->add(new
std::vector<TTabletLocation>(result->tablets));
_location->add_locations(*new_locations);
if (_write_single_replica) {
- _slave_location->add_locations(*new_locations);
+ auto* slave_locations = _pool->add(new
std::vector<TTabletLocation>(result->slave_tablets));
+ _slave_location->add_locations(*slave_locations);
}
// update new node info
@@ -1395,6 +1397,7 @@ Status VTabletWriter::_init_row_distribution() {
.vec_output_expr_ctxs = &_vec_output_expr_ctxs,
.schema = _schema,
.caller = this,
+ .write_single_replica = _write_single_replica,
.create_partition_callback =
&vectorized::on_partitions_created});
return _row_distribution.open(_output_row_desc);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 0b0ea7e8ff3..2883bc9c5c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -282,6 +282,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -290,6 +291,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.io.IOException;
+import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -299,6 +301,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -3726,6 +3729,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// build partition & tablets
List<TOlapTablePartition> partitions = Lists.newArrayList();
List<TTabletLocation> tablets = Lists.newArrayList();
+ List<TTabletLocation> slaveTablets = new ArrayList<>();
for (String partitionName : addPartitionClauseMap.keySet()) {
Partition partition = table.getPartition(partitionName);
TOlapTablePartition tPartition = new TOlapTablePartition();
@@ -3771,12 +3775,25 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
if (bePathsMap.keySet().size() < quorum) {
LOG.warn("auto go quorum exception");
}
- tablets.add(new TTabletLocation(tablet.getId(),
Lists.newArrayList(bePathsMap.keySet())));
+ if (request.isSetWriteSingleReplica() &&
request.isWriteSingleReplica()) {
+ Long[] nodes = bePathsMap.keySet().toArray(new
Long[0]);
+ Random random = new SecureRandom();
+ Long masterNode = nodes[random.nextInt(nodes.length)];
+ Multimap<Long, Long> slaveBePathsMap = bePathsMap;
+ slaveBePathsMap.removeAll(masterNode);
+ tablets.add(new TTabletLocation(tablet.getId(),
+
Lists.newArrayList(Sets.newHashSet(masterNode))));
+ slaveTablets.add(new TTabletLocation(tablet.getId(),
+ Lists.newArrayList(slaveBePathsMap.keySet())));
+ } else {
+ tablets.add(new TTabletLocation(tablet.getId(),
Lists.newArrayList(bePathsMap.keySet())));
+ }
}
}
}
result.setPartitions(partitions);
result.setTablets(tablets);
+ result.setSlaveTablets(slaveTablets);
// build nodes
List<TNodeInfo> nodeInfos = Lists.newArrayList();
@@ -3932,6 +3949,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// so they won't be changed again. if other transaction changing it.
just let it fail.
List<TOlapTablePartition> partitions = new ArrayList<>();
List<TTabletLocation> tablets = new ArrayList<>();
+ List<TTabletLocation> slaveTablets = new ArrayList<>();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
for (long partitionId : resultPartitionIds) {
Partition partition = olapTable.getPartition(partitionId);
@@ -3980,12 +3998,25 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
if (bePathsMap.keySet().size() < quorum) {
LOG.warn("auto go quorum exception");
}
- tablets.add(new TTabletLocation(tablet.getId(),
Lists.newArrayList(bePathsMap.keySet())));
+ if (request.isSetWriteSingleReplica() &&
request.isWriteSingleReplica()) {
+ Long[] nodes = bePathsMap.keySet().toArray(new
Long[0]);
+ Random random = new SecureRandom();
+ Long masterNode = nodes[random.nextInt(nodes.length)];
+ Multimap<Long, Long> slaveBePathsMap = bePathsMap;
+ slaveBePathsMap.removeAll(masterNode);
+ tablets.add(new TTabletLocation(tablet.getId(),
+
Lists.newArrayList(Sets.newHashSet(masterNode))));
+ slaveTablets.add(new TTabletLocation(tablet.getId(),
+ Lists.newArrayList(slaveBePathsMap.keySet())));
+ } else {
+ tablets.add(new TTabletLocation(tablet.getId(),
Lists.newArrayList(bePathsMap.keySet())));
+ }
}
}
}
result.setPartitions(partitions);
result.setTablets(tablets);
+ result.setSlaveTablets(slaveTablets);
// build nodes
List<TNodeInfo> nodeInfos = Lists.newArrayList();
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 12f247b4733..39d625dcff0 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1560,6 +1560,7 @@ struct TCreatePartitionRequest {
4: optional list<list<Exprs.TNullableStringLiteral>> partitionValues
// be_endpoint = <ip>:<heartbeat_port> to distinguish a particular BE
5: optional string be_endpoint
+ 6: optional bool write_single_replica = false
}
struct TCreatePartitionResult {
@@ -1567,6 +1568,7 @@ struct TCreatePartitionResult {
2: optional list<Descriptors.TOlapTablePartition> partitions
3: optional list<Descriptors.TTabletLocation> tablets
4: optional list<Descriptors.TNodeInfo> nodes
+ 5: optional list<Descriptors.TTabletLocation> slave_tablets
}
// these two for auto detect replacing partition
@@ -1577,6 +1579,7 @@ struct TReplacePartitionRequest {
4: optional list<i64> partition_ids // partition to replace.
// be_endpoint = <ip>:<heartbeat_port> to distinguish a particular BE
5: optional string be_endpoint
+ 6: optional bool write_single_replica = false
}
struct TReplacePartitionResult {
@@ -1584,6 +1587,7 @@ struct TReplacePartitionResult {
2: optional list<Descriptors.TOlapTablePartition> partitions
3: optional list<Descriptors.TTabletLocation> tablets
4: optional list<Descriptors.TNodeInfo> nodes
+ 5: optional list<Descriptors.TTabletLocation> slave_tablets
}
struct TGetMetaReplica {
diff --git
a/regression-test/data/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.csv
b/regression-test/data/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.csv
new file mode 100644
index 00000000000..747ac6788be
--- /dev/null
+++
b/regression-test/data/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.csv
@@ -0,0 +1,78 @@
+-3590935922607536626,-,2025-02-16,星辰医疗科技有限公司
+-3590935906895636626,-,2025-02-16,未来健康产业
+123,-,2025-02-16,蓝海生物有限公司
+100000048812501,-,2025-02-16,阳光医疗集团
+1000000076784258,-,2025-02-16,华夏健康科技
+1000022060522735,-,2025-02-16,瑞丰生物医药
+1000022193719484,-,2025-02-16,盛世医疗服务
+1000031422678924,-,2025-02-16,康宁健康有限公司
+1000085651028900,-,2025-02-16,史前生物科技
+1000093620518989,-,2025-02-16,健康之路公司
+1000103471774704,-,2025-02-16,安康医疗科技
+1000138777615262,-,2025-02-16,福瑞堂生物
+1000156823071129,-,2025-02-16,优质生活科技
+1000191711015262,-,2025-02-16,健康未来企业
+1000633041475486,-,2025-02-16,天使医疗集团
+1000681518627336,-,2025-02-16,百年健康公司
+1002458253925730,-,2025-02-16,康乐园生物科技
+1008126191610424,-,2025-02-16,华康医药有限公司
+1071904784424147,-,2025-02-16,金桥健康产业
+1076564522324147,-,2025-02-16,乐活医疗科技
+1202217708798485,-,2025-02-16,健康家园公司
+1224474148903456,-,2025-02-16,康泰生物科技
+2043829367811999,-,2025-02-16,未来医疗集团
+2191851926844270,-,2025-02-16,健康之源公司
+2232379824950609,-,2025-02-16,安宁医药有限公司
+2350341369782152,-,2025-02-16,和谐生物科技
+2548383917911403,-,2025-02-16,康健医疗服务
+2640774381717600,-,2025-02-16,瑞康医药有限公司
+2754625269782961,-,2025-02-16,乐享健康产业
+3064667398063809,-,2025-02-16,健康先锋公司
+3102689636972458,-,2025-02-16,安康生物科技
+3291916164371209,-,2025-02-16,未来之星医疗
+3946909802002976,-,2025-02-16,健康梦想公司
+3965513055005942,-,2025-02-16,康乐生物科技
+4143117309325214,-,2025-02-16,安宁健康产业
+4175970196426577,-,2025-02-16,乐活医疗集团
+4294566787233969,-,2025-02-16,健康之道公司
+4610682351457207,-,2025-02-16,瑞丰医疗科技
+4674640812462217,-,2025-02-16,未来健康企业
+4676494238858307,-,2025-02-16,安康医药有限公司
+4937264996861701,-,2025-02-16,乐享健康公司
+4947288173569190,-,2025-02-16,康宁医疗集团
+5115179098054305,-,2025-02-16,健康家园科技
+10000000430024147,-,2025-02-16,阳光医疗有限公司
+10000021073673208,-,2025-02-16,未来生物科技
+10000130032642122,-,2025-02-16,和谐健康产业
+10000365660973707,-,2025-02-16,安宁医药公司
+10000453096993544,-,2025-02-16,傻乐生物科技
+10000789012345678,-,2025-02-16,星辉医疗科技
+10000890123456789,-,2025-02-16,未来健康服务
+10000901234567890,-,2025-02-16,蓝天生物科技
+10001012345678901,-,2025-02-16,阳光健康产业
+10001123456789012,-,2025-02-16,华康医疗集团
+10001234567890123,-,2025-02-16,易丰生物医药
+10001345678901234,-,2025-02-16,盛世健康科技
+10001456789012345,-,2025-02-16,康宁医疗服务
+10001567890123456,-,2025-02-16,和谐生物公司
+10001678901234567,-,2025-02-16,健康之路科技
+10001789012345678,-,2025-02-16,安康生物产业
+10001890123456789,-,2025-02-16,福瑞堂医疗
+10001901234567890,-,2025-02-16,未来生活科技
+10002012345678901,-,2025-02-16,美好未来企业
+10002123456789012,-,2025-02-16,金地医疗集团
+10002234567890123,-,2025-02-16,老头健康公司
+10002345678901234,-,2025-02-16,平安园生物科技
+10002456789012345,-,2025-02-16,闪电医药有限公司
+10002567890123456,-,2025-02-16,铜桥健康产业
+10002678901234567,-,2025-02-16,乐天医疗科技
+10002789012345678,-,2025-02-16,健康成长公司
+10002890123456789,-,2025-02-16,尖端生物科技
+10002901234567890,-,2025-02-16,保护伞医疗集团
+10003012345678901,-,2025-02-16,青春之源公司
+10003123456789012,-,2025-02-16,大森林医药有限公司
+10003234567890123,-,2025-02-16,毒蛇生物科技
+10003345678901234,-,2025-02-16,金地医疗服务
+10003456789012345,-,2025-02-16,瑞丰医药有限公司
+10003567890123456,-,2025-02-16,乐游娱乐产业
+10003678901234567,-,2025-02-16,康岩先锋公司
diff --git
a/regression-test/data/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.out
b/regression-test/data/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.out
new file mode 100644
index 00000000000..e565de71e73
Binary files /dev/null and
b/regression-test/data/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.out
differ
diff --git
a/regression-test/suites/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.groovy
b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.groovy
new file mode 100644
index 00000000000..510b24602ac
--- /dev/null
+++
b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.groovy
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_auto_partition_with_single_replica_insert") {
+ def tableName1 = "test_auto_partition_with_single_replica_insert_1"
+ def tableName2 = "test_auto_partition_with_single_replica_insert_2"
+ sql "drop table if exists ${tableName1}"
+ sql """
+ CREATE TABLE `${tableName1}` (
+ `user_id` varchar(100) NULL,
+ `goods_id` varchar(100) NULL,
+ `dates` date NULL,
+ `chain_name` varchar(100) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`user_id`, `goods_id`, `dates`)
+ COMMENT 'OLAP'
+ AUTO PARTITION BY LIST (`chain_name`)
+ (PARTITION pchain5fname10 VALUES IN ("chain_name"),
+ PARTITION p4e0995e85ce1534e4e3a5 VALUES IN ("星辰医疗科技有限公司"))
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+ streamLoad {
+ table "${tableName1}"
+ set 'column_separator', ','
+ file "test_auto_partition_with_single_replica_insert.csv"
+ time 20000
+ }
+ sql " sync "
+ qt_select1 "select * from ${tableName1} order by user_id"
+ def result1 = sql "show partitions from ${tableName1}"
+ logger.info("${result1}")
+ assertEquals(result1.size(), 79)
+
+ sql "drop table if exists ${tableName2}"
+ sql """
+ CREATE TABLE `${tableName2}` (
+ `user_id` varchar(100) NULL,
+ `goods_id` varchar(100) NULL,
+ `dates` date NULL,
+ `chain_name` varchar(100) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`user_id`, `goods_id`, `dates`)
+ COMMENT 'OLAP'
+ AUTO PARTITION BY LIST (`chain_name`)
+ (PARTITION pchain5fname10 VALUES IN ("chain_name"),
+ PARTITION p4e0995e85ce1534e4e3a5 VALUES IN ("星辰医疗科技有限公司"))
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+ sql """set experimental_enable_nereids_planner = true"""
+ sql """set enable_memtable_on_sink_node = false"""
+ sql """set experimental_enable_single_replica_insert = true"""
+ sql "insert into ${tableName2} select user_id, goods_id, dates, chain_name
from ${tableName1}"
+ sql " sync "
+ qt_select2 "select * from ${tableName2} order by user_id"
+ def result2 = sql "show partitions from ${tableName1}"
+ logger.info("${result2}")
+ assertEquals(result1.size(), 79)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]