This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 97105e9a16f [regression](compaction) Add case to test single replica
compaction (#27199)
97105e9a16f is described below
commit 97105e9a16fb5df8d5e1c3885bff8a815f51d400
Author: Xiaocc <[email protected]>
AuthorDate: Thu Nov 30 21:27:13 2023 +0800
[regression](compaction) Add case to test single replica compaction (#27199)
---
be/src/olap/tablet.cpp | 14 ++
.../compaction/test_single_replica_compaction.out | 10 +
.../test_single_replica_compaction.groovy | 248 +++++++++++++++++++++
3 files changed, 272 insertions(+)
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index fad40012b0a..bbd57cf1b51 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1449,6 +1449,20 @@ void Tablet::get_compaction_status(std::string*
json_result) {
root.GetAllocator());
root.AddMember("last base status", base_compaction_status_value,
root.GetAllocator());
+ TReplicaInfo replica_info;
+ std::string dummp_token;
+ rapidjson::Value fetch_addr;
+ if (tablet_meta()->tablet_schema()->enable_single_replica_compaction() &&
+ StorageEngine::instance()->get_peer_replica_info(tablet_id(),
&replica_info,
+ &dummp_token)) {
+ std::string addr = replica_info.host + ":" +
std::to_string(replica_info.brpc_port);
+ fetch_addr.SetString(addr.c_str(), addr.length(), root.GetAllocator());
+ } else {
+ // -1 means do compaction locally
+ fetch_addr.SetString("-1", root.GetAllocator());
+ }
+ root.AddMember("fetch from peer", fetch_addr, root.GetAllocator());
+
// print all rowsets' version as an array
rapidjson::Document versions_arr;
rapidjson::Document missing_versions_arr;
diff --git a/regression-test/data/compaction/test_single_replica_compaction.out
b/regression-test/data/compaction/test_single_replica_compaction.out
new file mode 100644
index 00000000000..9895dbe9e11
--- /dev/null
+++ b/regression-test/data/compaction/test_single_replica_compaction.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 b 100
+2 b 100
+3 b 100
+5 a 100
+6 a 100
+7 a 100
+8 a 100
+
diff --git
a/regression-test/suites/compaction/test_single_replica_compaction.groovy
b/regression-test/suites/compaction/test_single_replica_compaction.groovy
new file mode 100644
index 00000000000..4e3fba73660
--- /dev/null
+++ b/regression-test/suites/compaction/test_single_replica_compaction.groovy
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_single_replica_compaction", "p2") {
+ def tableName = "test_single_replica_compaction"
+
+ def set_be_config = { key, value ->
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+
+ boolean disableAutoCompaction = true
+ boolean has_update_be_config = false
+ try {
+ String backend_id;
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+
+ logger.info("Show config: code=" + code + ", out=" + out + ", err=" +
err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == "disable_auto_compaction") {
+ disableAutoCompaction = Boolean.parseBoolean(((List<String>)
ele)[2])
+ }
+ }
+ set_be_config.call("disable_auto_compaction", "true")
+ has_update_be_config = true
+
+ def triggerCompaction = { be_host, be_http_port, compact_type,
tablet_id ->
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X POST http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/run?tablet_id=")
+ sb.append(tablet_id)
+ sb.append("&compact_type=${compact_type}")
+
+ String command = sb.toString()
+ logger.info(command)
+ process = command.execute()
+ code = process.waitFor()
+ err = IOGroovyMethods.getText(new BufferedReader(new
InputStreamReader(process.getErrorStream())));
+ out = process.getText()
+ logger.info("Run compaction: code=" + code + ", out=" + out + ",
disableAutoCompaction " + disableAutoCompaction + ", err=" + err)
+ if (!disableAutoCompaction) {
+ return "Success, " + out
+ }
+ assertEquals(code, 0)
+ return out
+ }
+
+ def waitForCompaction = { be_host, be_http_port, tablet_id ->
+ boolean running = true
+ do {
+ Thread.sleep(1000)
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/run_status?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ process = command.execute()
+ code = process.waitFor()
+ out = process.getText()
+ logger.info("Get compaction status: code=" + code + ", out=" +
out)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+
+ def getTabletStatus = { be_host, be_http_port, tablet_id ->
+ boolean running = true
+ Thread.sleep(1000)
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/show?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ process = command.execute()
+ code = process.waitFor()
+ out = process.getText()
+ logger.info("Get tablet status: code=" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def tabletStatus = parseJson(out.trim())
+ return tabletStatus
+ }
+
+
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+ sql """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `score` int(11) NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ( "replication_num" = "3",
"enable_single_replica_compaction" = "true" );
+ """
+
+ String[][] tablets = sql """ show tablets from ${tableName}; """
+
+ // wait for update replica infos
+ // be.conf: update_replica_infos_interval_seconds
+ Thread.sleep(20000)
+
+ // find the master be for single replica compaction
+ Boolean found = false
+ String master_backend_id;
+ List<String> follower_backend_id = new ArrayList<>()
+ // The test table only has one bucket with 3 replicas,
+ // and `show tablets` will return 3 different replicas with the same
tablet.
+ // So we can use the same tablet_id to get tablet/trigger compaction
with different backends.
+ String tablet_id = tablets[0][0]
+ for (String[] tablet in tablets) {
+ String trigger_backend_id = tablet[2]
+ def tablet_status =
getTabletStatus(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id);
+ def fetchFromPeerValue = tablet_status."fetch from peer"
+
+ if (found && fetchFromPeerValue.contains("-1")) {
+ logger.warn("multipe master");
+ assertTrue(false)
+ }
+ if (fetchFromPeerValue.contains("-1")) {
+ found = true
+ master_backend_id = trigger_backend_id
+ } else {
+ follower_backend_id.add(trigger_backend_id)
+ }
+ }
+
+ def checkCompactionResult = {
+ def master_tablet_status =
getTabletStatus(backendId_to_backendIP[master_backend_id],
backendId_to_backendHttpPort[master_backend_id], tablet_id);
+ def master_rowsets = master_tablet_status."rowsets"
+ assert master_rowsets instanceof List
+ logger.info("rowset size: " + master_rowsets.size())
+
+ for (String backend: follower_backend_id) {
+ def tablet_status =
getTabletStatus(backendId_to_backendIP[backend],
backendId_to_backendHttpPort[backend], tablet_id);
+ def rowsets = tablet_status."rowsets"
+ assert rowsets instanceof List
+ assertEquals(master_rowsets.size(), rowsets.size())
+ }
+ }
+
+ sql """ INSERT INTO ${tableName} VALUES (1, "a", 100); """
+ sql """ INSERT INTO ${tableName} VALUES (1, "b", 100); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "a", 100); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "b", 100); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "a", 100); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "b", 100); """
+
+ // trigger master be to do cum compaction
+
assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id],
backendId_to_backendHttpPort[master_backend_id],
+ "cumulative", tablet_id).contains("Success"));
+ waitForCompaction(backendId_to_backendIP[master_backend_id],
backendId_to_backendHttpPort[master_backend_id], tablet_id)
+
+ // trigger follower be to fetch compaction result
+ for (String id in follower_backend_id) {
+ assertTrue(triggerCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id],
+ "cumulative", tablet_id).contains("Success"));
+ waitForCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id], tablet_id)
+ }
+
+ // check rowsets
+ checkCompactionResult.call()
+
+ sql """ INSERT INTO ${tableName} VALUES (4, "a", 100); """
+ sql """ DELETE FROM ${tableName} WHERE id = 4; """
+ sql """ INSERT INTO ${tableName} VALUES (5, "a", 100); """
+ sql """ INSERT INTO ${tableName} VALUES (6, "a", 100); """
+ sql """ INSERT INTO ${tableName} VALUES (7, "a", 100); """
+ sql """ INSERT INTO ${tableName} VALUES (8, "a", 100); """
+
+ // trigger master be to do cum compaction with delete
+
assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id],
backendId_to_backendHttpPort[master_backend_id],
+ "cumulative", tablet_id).contains("Success"));
+ waitForCompaction(backendId_to_backendIP[master_backend_id],
backendId_to_backendHttpPort[master_backend_id], tablet_id)
+
+ // trigger follower be to fetch compaction result
+ for (String id in follower_backend_id) {
+ assertTrue(triggerCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id],
+ "cumulative", tablet_id).contains("Success"));
+ waitForCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id], tablet_id)
+ }
+
+ // check rowsets
+ checkCompactionResult.call()
+
+ // trigger master be to do base compaction
+
assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id],
backendId_to_backendHttpPort[master_backend_id],
+ "base", tablet_id).contains("Success"));
+ waitForCompaction(backendId_to_backendIP[master_backend_id],
backendId_to_backendHttpPort[master_backend_id], tablet_id)
+
+ // // trigger follower be to fetch compaction result
+ for (String id in follower_backend_id) {
+ assertTrue(triggerCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id],
+ "base", tablet_id).contains("Success"));
+ waitForCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id], tablet_id)
+ }
+
+ // check rowsets
+ checkCompactionResult.call()
+
+ qt_sql """
+ select * from ${tableName} order by id
+ """
+
+ } finally {
+ if (has_update_be_config) {
+ set_be_config.call("disable_auto_compaction",
disableAutoCompaction.toString())
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]