This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 496befd2926 [improvement](decommission be) decommission check replica
num (#32748)
496befd2926 is described below
commit 496befd292698102e842a27d523daaf70b46a328
Author: yujun <[email protected]>
AuthorDate: Thu Mar 28 09:37:54 2024 +0800
[improvement](decommission be) decommission check replica num (#32748)
---
.../java/org/apache/doris/alter/SystemHandler.java | 84 +++++++++++++++++++++-
.../test_decommission_with_replica_num_fail.groovy | 59 +++++++++++++++
regression-test/suites/node_p0/test_backend.groovy | 26 ++++---
3 files changed, 158 insertions(+), 11 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
index e503e093787..57e00f5ab14 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
@@ -33,24 +33,34 @@ import org.apache.doris.analysis.ModifyBrokerClause;
import org.apache.doris.analysis.ModifyFrontendHostNameClause;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MysqlCompatibleDatabase;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
/*
* SystemHandler is for
@@ -235,7 +245,8 @@ public class SystemHandler extends AlterHandler {
decommissionBackends.add(backend);
}
- // TODO(cmy): check if replication num can be met
+ checkDecommissionWithReplicaAllocation(decommissionBackends);
+
// TODO(cmy): check remaining space
return decommissionBackends;
@@ -258,12 +269,81 @@ public class SystemHandler extends AlterHandler {
decommissionBackends.add(backend);
}
- // TODO(cmy): check if replication num can be met
+ checkDecommissionWithReplicaAllocation(decommissionBackends);
+
// TODO(cmy): check remaining space
return decommissionBackends;
}
+ private static void checkDecommissionWithReplicaAllocation(List<Backend>
decommissionBackends)
+ throws DdlException {
+ if (Config.isCloudMode() || decommissionBackends.isEmpty()
+ ||
DebugPointUtil.isEnable("SystemHandler.decommission_no_check_replica_num")) {
+ return;
+ }
+
+ Set<Tag> decommissionTags = decommissionBackends.stream().map(be ->
be.getLocationTag())
+ .collect(Collectors.toSet());
+ Map<Tag, Integer> tagAvailBackendNums = Maps.newHashMap();
+ for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
+ long beId = backend.getId();
+ if (!backend.isScheduleAvailable()
+ || decommissionBackends.stream().anyMatch(be -> be.getId()
== beId)) {
+ continue;
+ }
+
+ Tag tag = backend.getLocationTag();
+ if (tag != null) {
+ tagAvailBackendNums.put(tag,
tagAvailBackendNums.getOrDefault(tag, 0) + 1);
+ }
+ }
+
+ Env env = Env.getCurrentEnv();
+ List<Long> dbIds = env.getInternalCatalog().getDbIds();
+ for (Long dbId : dbIds) {
+ Database db = env.getInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ continue;
+ }
+
+ if (db instanceof MysqlCompatibleDatabase) {
+ continue;
+ }
+
+ for (Table table : db.getTables()) {
+ table.readLock();
+ try {
+ if (!table.needSchedule()) {
+ continue;
+ }
+
+ OlapTable tbl = (OlapTable) table;
+ for (Partition partition : tbl.getAllPartitions()) {
+ ReplicaAllocation replicaAlloc =
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
+ for (Map.Entry<Tag, Short> entry :
replicaAlloc.getAllocMap().entrySet()) {
+ Tag tag = entry.getKey();
+ if (!decommissionTags.contains(tag)) {
+ continue;
+ }
+ int replicaNum = (int) entry.getValue();
+ int backendNum =
tagAvailBackendNums.getOrDefault(tag, 0);
+ if (replicaNum > backendNum) {
+ throw new DdlException("After decommission,
partition " + partition.getName()
+ + " of table " + db.getName() + "." +
tbl.getName()
+ + " 's replication allocation { " +
replicaAlloc
+ + " } > available backend num " +
backendNum + " on tag " + tag
+ + ", otherwise need to decrease the
partition's replication num.");
+ }
+ }
+ }
+ } finally {
+ table.readUnlock();
+ }
+ }
+ }
+ }
+
@Override
public synchronized void cancel(CancelStmt stmt) throws DdlException {
CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt)
stmt;
diff --git
a/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy
b/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy
new file mode 100644
index 00000000000..ff19adae27d
--- /dev/null
+++
b/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite('test_decommission_with_replica_num_fail') {
+ if (isCloudMode()) {
+ return
+ }
+
+ def tbl = 'test_decommission_with_replica_num_fail'
+ def backends = sql_return_maparray('show backends')
+ def replicaNum = 0
+ def targetBackend = null
+ for (def be : backends) {
+ def alive = be.Alive.toBoolean()
+ def decommissioned = be.SystemDecommissioned.toBoolean()
+ if (alive && !decommissioned) {
+ replicaNum++
+ targetBackend = be
+ }
+ }
+ assertTrue(replicaNum > 0)
+
+ sql "DROP TABLE IF EXISTS ${tbl} FORCE"
+ sql """
+ CREATE TABLE ${tbl}
+ (
+ k1 int,
+ k2 int
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 6
+ PROPERTIES
+ (
+ "replication_num" = "${replicaNum}"
+ );
+ """
+ try {
+ test {
+ sql "ALTER SYSTEM DECOMMISSION BACKEND
'${targetBackend.Host}:${targetBackend.HeartbeatPort}'"
+ exception "otherwise need to decrease the partition's replication
num"
+ }
+ } finally {
+ sql "CANCEL DECOMMISSION BACKEND
'${targetBackend.Host}:${targetBackend.HeartbeatPort}'"
+ }
+ sql "DROP TABLE IF EXISTS ${tbl} FORCE"
+}
diff --git a/regression-test/suites/node_p0/test_backend.groovy
b/regression-test/suites/node_p0/test_backend.groovy
index 1fe6f802e90..cce111b0a19 100644
--- a/regression-test/suites/node_p0/test_backend.groovy
+++ b/regression-test/suites/node_p0/test_backend.groovy
@@ -41,11 +41,12 @@ suite("test_backend", "nonConcurrent") {
}
if (context.config.jdbcUser.equals("root")) {
+ def beId1 = null
try {
+
GetDebugPoint().enableDebugPointForAllFEs("SystemHandler.decommission_no_check_replica_num");
try_sql """admin set frontend
config("drop_backend_after_decommission" = "false")"""
def result = sql_return_maparray """SHOW BACKENDS;"""
logger.info("show backends result:${result}")
- def beId1 = null
for (def res : result) {
beId1 = res.BackendId
break
@@ -58,16 +59,23 @@ suite("test_backend", "nonConcurrent") {
assertTrue(res.SystemDecommissioned.toBoolean())
}
}
- result = sql """CANCEL DECOMMISSION BACKEND "${beId1}" """
- logger.info("CANCEL DECOMMISSION BACKEND ${result}")
- result = sql_return_maparray """SHOW BACKENDS;"""
- for (def res : result) {
- if (res.BackendId == "${beId1}") {
- assertFalse(res.SystemDecommissioned.toBoolean())
+ } finally {
+ try {
+ if (beId1 != null) {
+ def result = sql """CANCEL DECOMMISSION BACKEND "${beId1}"
"""
+ logger.info("CANCEL DECOMMISSION BACKEND ${result}")
+
+ result = sql_return_maparray """SHOW BACKENDS;"""
+ for (def res : result) {
+ if (res.BackendId == "${beId1}") {
+ assertFalse(res.SystemDecommissioned.toBoolean())
+ }
+ }
}
+ } finally {
+
GetDebugPoint().disableDebugPointForAllFEs('SystemHandler.decommission_no_check_replica_num');
+ try_sql """admin set frontend
config("drop_backend_after_decommission" = "true")"""
}
- } finally {
- try_sql """admin set frontend
config("drop_backend_after_decommission" = "true")"""
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]