This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 6b31a7db6c5 [branch-2.0](colocate group) fix colocate group always
exclude the same host #33823 (#36503)
6b31a7db6c5 is described below
commit 6b31a7db6c54c6ea28d7e65604ae74d63d58f6b0
Author: yujun <[email protected]>
AuthorDate: Thu Jun 20 17:52:19 2024 +0800
[branch-2.0](colocate group) fix colocate group always exclude the same
host #33823 (#36503)
---
.../clone/ColocateTableCheckerAndBalancer.java | 25 ++++++-
.../doris/cluster/DecommissionBackendTest.java | 86 +++++++++++++++++++++-
.../apache/doris/utframe/TestWithFeService.java | 6 +-
3 files changed, 110 insertions(+), 7 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index 456701213ba..1a085ab8104 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -859,6 +859,8 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
int targetSeqIndex = -1;
long minDataSizeDiff = Long.MAX_VALUE;
+ boolean destBeContainsAllBuckets = true;
+ boolean theSameHostContainsAllBuckets = true;
for (int seqIndex : seqIndexes) {
// the bucket index.
// eg: 0 / 3 = 0, so that the bucket index of the 4th
backend id in flatBackendsPerBucketSeq is 0.
@@ -866,9 +868,15 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
List<Long> backendsSet =
backendsPerBucketSeq.get(bucketIndex);
List<String> hostsSet = hostsPerBucketSeq.get(bucketIndex);
// the replicas of a tablet can not locate in same Backend
or same host
- if (backendsSet.contains(destBeId) ||
hostsSet.contains(destBe.getHost())) {
+ if (backendsSet.contains(destBeId)) {
continue;
}
+ destBeContainsAllBuckets = false;
+
+ if (!Config.allow_replica_on_same_host &&
hostsSet.contains(destBe.getHost())) {
+ continue;
+ }
+ theSameHostContainsAllBuckets = false;
Preconditions.checkState(backendsSet.contains(srcBeId),
srcBeId);
long bucketDataSize =
@@ -895,8 +903,19 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
if (targetSeqIndex < 0) {
// we use next node as dst node
- LOG.info("unable to replace backend {} with backend {} in
colocate group {}",
- srcBeId, destBeId, groupId);
+ String failedReason;
+ if (destBeContainsAllBuckets) {
+ failedReason = "dest be contains all the same buckets";
+ } else if (theSameHostContainsAllBuckets) {
+ failedReason = "dest be's host contains all the same
buckets "
+ + "and
Config.allow_replica_on_same_host=false";
+ } else {
+ failedReason = "dest be has no fit path, maybe disk
usage is exceeds "
+ +
"Config.storage_high_watermark_usage_percent";
+ }
+ LOG.info("unable to replace backend {} with dest backend
{} in colocate group {}, "
+ + "failed reason: {}",
+ srcBeId, destBeId, groupId, failedReason);
continue;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
index e689723cdf8..79216f28c40 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -22,6 +22,10 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.clone.RebalancerTestUtil;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
@@ -39,7 +43,7 @@ import java.util.List;
public class DecommissionBackendTest extends TestWithFeService {
@Override
protected int backendNum() {
- return 3;
+ return 4;
}
@Override
@@ -56,10 +60,14 @@ public class DecommissionBackendTest extends
TestWithFeService {
@Override
protected void beforeCreatingConnectContext() throws Exception {
FeConstants.default_scheduler_interval_millisecond = 1000;
- FeConstants.tablet_checker_interval_ms = 1000;
+ FeConstants.tablet_checker_interval_ms = 100;
+ FeConstants.tablet_schedule_interval_ms = 100;
Config.tablet_repair_delay_factor_second = 1;
Config.allow_replica_on_same_host = true;
Config.disable_balance = true;
+ Config.schedule_batch_size = 1000;
+ Config.schedule_slot_num_per_hdd_path = 1000;
+ FeConstants.heartbeat_interval_second = 5;
}
@Test
@@ -76,6 +84,7 @@ public class DecommissionBackendTest extends
TestWithFeService {
// 3. create table tbl1
createTable("create table db1.tbl1(k1 int) distributed by hash(k1)
buckets 3 properties('replication_num' = '1');");
+ RebalancerTestUtil.updateReplicaPathHash();
// 4. query tablet num
int tabletNum =
Env.getCurrentInvertedIndex().getTabletMetaMap().size();
@@ -133,6 +142,7 @@ public class DecommissionBackendTest extends
TestWithFeService {
createTable("create table db2.tbl1(k1 int) distributed by hash(k1)
buckets 3 properties('replication_num' = '"
+ availableBeNum + "');");
createTable("create table db2.tbl2(k1 int) distributed by hash(k1)
buckets 3 properties('replication_num' = '1');");
+ RebalancerTestUtil.updateReplicaPathHash();
// 4. query tablet num
int tabletNum =
Env.getCurrentInvertedIndex().getTabletMetaMap().size();
@@ -180,9 +190,81 @@ public class DecommissionBackendTest extends
TestWithFeService {
// recover tbl1, because tbl1 has more than one replica, so it still
can be recovered
Assertions.assertDoesNotThrow(() -> recoverTable("db2.tbl1"));
Assertions.assertDoesNotThrow(() -> showCreateTable(sql));
+ dropTable("db2.tbl1", false);
addNewBackend();
Assertions.assertEquals(backendNum(),
Env.getCurrentSystemInfo().getIdToBackend().size());
}
+ @Test
+ public void testDecommissionBackendWithColocateGroup() throws Exception {
+ // 1. create connect context
+ connectContext = createDefaultCtx();
+
+ ImmutableMap<Long, Backend> idToBackendRef =
Env.getCurrentSystemInfo().getIdToBackend();
+ Assertions.assertEquals(backendNum(), idToBackendRef.size());
+
+ // 2. create database db1
+ createDatabase("db4");
+ System.out.println(Env.getCurrentInternalCatalog().getDbNames());
+
+ // 3. create table
+ createTable("CREATE TABLE db4.table1 (\n"
+ + " `c1` varchar(20) NULL,\n"
+ + " `c2` bigint(20) NULL,\n"
+ + " `c3` int(20) not NULL,\n"
+ + " `k4` bitmap BITMAP_UNION NULL,\n"
+ + " `k5` bitmap BITMAP_UNION NULL\n"
+ + ") ENGINE=OLAP\n"
+ + "AGGREGATE KEY(`c1`, `c2`, `c3`)\n"
+ + "COMMENT 'OLAP'\n"
+ + "DISTRIBUTED BY HASH(`c2`) BUCKETS 20\n"
+ + "PROPERTIES(\n"
+ + " 'colocate_with' = 'foo',\n"
+ + " 'replication_num' = '3'\n"
+ + ")"
+ + ";");
+
+ RebalancerTestUtil.updateReplicaPathHash();
+
+ Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db4");
+ OlapTable tbl = (OlapTable) db.getTableOrMetaException("table1");
+ Assertions.assertNotNull(tbl);
+
+ Partition partition = tbl.getPartitions().iterator().next();
+ Tablet tablet =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)
+ .iterator().next().getTablets().iterator().next();
+ Assertions.assertNotNull(tablet);
+ Backend srcBackend =
Env.getCurrentSystemInfo().getBackend(tablet.getReplicas().get(0).getBackendId());
+ Assertions.assertNotNull(srcBackend);
+
+ // 4. query tablet num
+ int tabletNum =
Env.getCurrentInvertedIndex().getTabletMetaMap().size();
+
+ String decommissionStmtStr = "alter system decommission backend
\"127.0.0.1:"
+ + srcBackend.getHeartbeatPort() + "\"";
+ AlterSystemStmt decommissionStmt = (AlterSystemStmt)
parseAndAnalyzeStmt(decommissionStmtStr);
+
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
+
+ Assertions.assertTrue(srcBackend.isDecommissioned());
+ long startTimestamp = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTimestamp < 90000
+ &&
Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) {
+ Thread.sleep(1000);
+ }
+
+ Assertions.assertEquals(backendNum() - 1,
Env.getCurrentSystemInfo().getIdToBackend().size());
+
+ // For now, we have pre-built internal table: analysis_job and
column_statistics
+ Assertions.assertEquals(tabletNum,
+ Env.getCurrentInvertedIndex().getTabletMetaMap().size());
+
+ for (Replica replica : tablet.getReplicas()) {
+ Assertions.assertTrue(replica.getBackendId() !=
srcBackend.getId());
+ }
+
+ // 6. add backend
+ addNewBackend();
+ Assertions.assertEquals(backendNum(),
Env.getCurrentSystemInfo().getIdToBackend().size());
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 41eb4e6c4e1..c61fcd28afe 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -405,7 +405,7 @@ public abstract class TestWithFeService {
}
private void checkBEHeartbeat(List<Backend> bes) throws
InterruptedException {
- int maxTry = 10;
+ int maxTry = FeConstants.heartbeat_interval_second + 5;
boolean allAlive = false;
while (maxTry-- > 0 && !allAlive) {
Thread.sleep(1000);
@@ -437,7 +437,9 @@ public abstract class TestWithFeService {
}
protected Backend addNewBackend() throws IOException, InterruptedException
{
- return createBackend("127.0.0.1", lastFeRpcPort);
+ Backend be = createBackend("127.0.0.1", lastFeRpcPort);
+ checkBEHeartbeat(Lists.newArrayList(be));
+ return be;
}
protected Backend createBackend(String beHost, int feRpcPort) throws
IOException, InterruptedException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]