This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 97105e9a16f [regression](compaction) Add case to test single replica 
compaction (#27199)
97105e9a16f is described below

commit 97105e9a16fb5df8d5e1c3885bff8a815f51d400
Author: Xiaocc <[email protected]>
AuthorDate: Thu Nov 30 21:27:13 2023 +0800

    [regression](compaction) Add case to test single replica compaction (#27199)
---
 be/src/olap/tablet.cpp                             |  14 ++
 .../compaction/test_single_replica_compaction.out  |  10 +
 .../test_single_replica_compaction.groovy          | 248 +++++++++++++++++++++
 3 files changed, 272 insertions(+)

diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index fad40012b0a..bbd57cf1b51 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1449,6 +1449,20 @@ void Tablet::get_compaction_status(std::string* 
json_result) {
                                            root.GetAllocator());
     root.AddMember("last base status", base_compaction_status_value, 
root.GetAllocator());
 
+    TReplicaInfo replica_info;
+    std::string dummp_token;
+    rapidjson::Value fetch_addr;
+    if (tablet_meta()->tablet_schema()->enable_single_replica_compaction() &&
+        StorageEngine::instance()->get_peer_replica_info(tablet_id(), 
&replica_info,
+                                                         &dummp_token)) {
+        std::string addr = replica_info.host + ":" + 
std::to_string(replica_info.brpc_port);
+        fetch_addr.SetString(addr.c_str(), addr.length(), root.GetAllocator());
+    } else {
+        // -1 means do compaction locally
+        fetch_addr.SetString("-1", root.GetAllocator());
+    }
+    root.AddMember("fetch from peer", fetch_addr, root.GetAllocator());
+
     // print all rowsets' version as an array
     rapidjson::Document versions_arr;
     rapidjson::Document missing_versions_arr;
diff --git a/regression-test/data/compaction/test_single_replica_compaction.out 
b/regression-test/data/compaction/test_single_replica_compaction.out
new file mode 100644
index 00000000000..9895dbe9e11
--- /dev/null
+++ b/regression-test/data/compaction/test_single_replica_compaction.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      b       100
+2      b       100
+3      b       100
+5      a       100
+6      a       100
+7      a       100
+8      a       100
+
diff --git 
a/regression-test/suites/compaction/test_single_replica_compaction.groovy 
b/regression-test/suites/compaction/test_single_replica_compaction.groovy
new file mode 100644
index 00000000000..4e3fba73660
--- /dev/null
+++ b/regression-test/suites/compaction/test_single_replica_compaction.groovy
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_single_replica_compaction", "p2") {
+    def tableName = "test_single_replica_compaction"
+  
+    def set_be_config = { key, value ->
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        for (String backend_id: backendId_to_backendIP.keySet()) {
+            def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
+            logger.info("update config: code=" + code + ", out=" + out + ", 
err=" + err)
+        }
+    }
+
+    boolean disableAutoCompaction = true
+    boolean has_update_be_config = false
+    try {
+        String backend_id;
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        backend_id = backendId_to_backendIP.keySet()[0]
+        def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+        
+        logger.info("Show config: code=" + code + ", out=" + out + ", err=" + 
err)
+        assertEquals(code, 0)
+        def configList = parseJson(out.trim())
+        assert configList instanceof List
+
+        for (Object ele in (List) configList) {
+            assert ele instanceof List<String>
+            if (((List<String>) ele)[0] == "disable_auto_compaction") {
+                disableAutoCompaction = Boolean.parseBoolean(((List<String>) 
ele)[2])
+            }
+        }
+        set_be_config.call("disable_auto_compaction", "true")
+        has_update_be_config = true
+
+        def triggerCompaction = { be_host, be_http_port, compact_type, 
tablet_id ->
+            StringBuilder sb = new StringBuilder();
+            sb.append("curl -X POST http://${be_host}:${be_http_port}";)
+            sb.append("/api/compaction/run?tablet_id=")
+            sb.append(tablet_id)
+            sb.append("&compact_type=${compact_type}")
+
+            String command = sb.toString()
+            logger.info(command)
+            process = command.execute()
+            code = process.waitFor()
+            err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())));
+            out = process.getText()
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
disableAutoCompaction " + disableAutoCompaction + ", err=" + err)
+            if (!disableAutoCompaction) {
+                return "Success, " + out
+            }
+            assertEquals(code, 0)
+            return out
+        } 
+
+        def waitForCompaction = { be_host, be_http_port, tablet_id ->
+            boolean running = true
+            do {
+                Thread.sleep(1000)
+                StringBuilder sb = new StringBuilder();
+                sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+                sb.append("/api/compaction/run_status?tablet_id=")
+                sb.append(tablet_id)
+
+                String command = sb.toString()
+                logger.info(command)
+                process = command.execute()
+                code = process.waitFor()
+                out = process.getText()
+                logger.info("Get compaction status: code=" + code + ", out=" + 
out)
+                assertEquals(code, 0)
+                def compactionStatus = parseJson(out.trim())
+                assertEquals("success", compactionStatus.status.toLowerCase())
+                running = compactionStatus.run_status
+            } while (running)
+        }
+
+        def getTabletStatus = { be_host, be_http_port, tablet_id ->
+            boolean running = true
+            Thread.sleep(1000)
+            StringBuilder sb = new StringBuilder();
+            sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+            sb.append("/api/compaction/show?tablet_id=")
+            sb.append(tablet_id)
+
+            String command = sb.toString()
+            logger.info(command)
+            process = command.execute()
+            code = process.waitFor()
+            out = process.getText()
+            logger.info("Get tablet status: code=" + code + ", out=" + out)
+            assertEquals(code, 0)
+            def tabletStatus = parseJson(out.trim())
+            return tabletStatus
+        }
+
+
+        sql """ DROP TABLE IF EXISTS ${tableName}; """
+        sql """
+            CREATE TABLE ${tableName} (
+                `id` int(11) NULL,
+                `name` varchar(255) NULL,
+                `score` int(11) NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`id`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES ( "replication_num" = "3", 
"enable_single_replica_compaction" = "true" );
+        """
+
+        String[][] tablets = sql """ show tablets from ${tableName}; """
+
+        // wait for update replica infos
+        // be.conf: update_replica_infos_interval_seconds
+        Thread.sleep(20000)
+        
+        // find the master be for single replica compaction
+        Boolean found = false
+        String master_backend_id;
+        List<String> follower_backend_id = new ArrayList<>()
+        // The test table only has one bucket with 3 replicas,
+        // and `show tablets` will return 3 different replicas with the same 
tablet.
+        // So we can use the same tablet_id to get tablet/trigger compaction 
with different backends.
+        String tablet_id = tablets[0][0]
+        for (String[] tablet in tablets) {
+            String trigger_backend_id = tablet[2]
+            def tablet_status = 
getTabletStatus(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id], tablet_id);
+            def fetchFromPeerValue = tablet_status."fetch from peer"
+
+            if (found && fetchFromPeerValue.contains("-1")) {
+                logger.warn("multipe master");
+                assertTrue(false)
+            }
+            if (fetchFromPeerValue.contains("-1")) {
+                found = true
+                master_backend_id = trigger_backend_id
+            } else {
+                follower_backend_id.add(trigger_backend_id)
+            }
+        }
+
+        def checkCompactionResult = {
+            def master_tablet_status = 
getTabletStatus(backendId_to_backendIP[master_backend_id], 
backendId_to_backendHttpPort[master_backend_id], tablet_id);
+            def master_rowsets = master_tablet_status."rowsets"
+            assert master_rowsets instanceof List
+            logger.info("rowset size: " + master_rowsets.size())
+
+            for (String backend: follower_backend_id) {
+                def tablet_status = 
getTabletStatus(backendId_to_backendIP[backend], 
backendId_to_backendHttpPort[backend], tablet_id);
+                def rowsets = tablet_status."rowsets"
+                assert rowsets instanceof List
+                assertEquals(master_rowsets.size(), rowsets.size())
+            }
+        }
+
+        sql """ INSERT INTO ${tableName} VALUES (1, "a", 100); """
+        sql """ INSERT INTO ${tableName} VALUES (1, "b", 100); """
+        sql """ INSERT INTO ${tableName} VALUES (2, "a", 100); """
+        sql """ INSERT INTO ${tableName} VALUES (2, "b", 100); """
+        sql """ INSERT INTO ${tableName} VALUES (3, "a", 100); """
+        sql """ INSERT INTO ${tableName} VALUES (3, "b", 100); """
+
+        // trigger master be to do cum compaction
+        
assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id], 
backendId_to_backendHttpPort[master_backend_id],
+                    "cumulative", tablet_id).contains("Success")); 
+        waitForCompaction(backendId_to_backendIP[master_backend_id], 
backendId_to_backendHttpPort[master_backend_id], tablet_id)
+
+        // trigger follower be to fetch compaction result
+        for (String id in follower_backend_id) {
+            assertTrue(triggerCompaction(backendId_to_backendIP[id], 
backendId_to_backendHttpPort[id],
+                    "cumulative", tablet_id).contains("Success")); 
+            waitForCompaction(backendId_to_backendIP[id], 
backendId_to_backendHttpPort[id], tablet_id)
+        }
+
+        // check rowsets
+        checkCompactionResult.call()
+
+        sql """ INSERT INTO ${tableName} VALUES (4, "a", 100); """
+        sql """ DELETE FROM ${tableName} WHERE id = 4; """
+        sql """ INSERT INTO ${tableName} VALUES (5, "a", 100); """
+        sql """ INSERT INTO ${tableName} VALUES (6, "a", 100); """
+        sql """ INSERT INTO ${tableName} VALUES (7, "a", 100); """
+        sql """ INSERT INTO ${tableName} VALUES (8, "a", 100); """
+
+        // trigger master be to do cum compaction with delete
+        
assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id], 
backendId_to_backendHttpPort[master_backend_id],
+                    "cumulative", tablet_id).contains("Success")); 
+        waitForCompaction(backendId_to_backendIP[master_backend_id], 
backendId_to_backendHttpPort[master_backend_id], tablet_id)
+
+        // trigger follower be to fetch compaction result
+        for (String id in follower_backend_id) {
+            assertTrue(triggerCompaction(backendId_to_backendIP[id], 
backendId_to_backendHttpPort[id],
+                    "cumulative", tablet_id).contains("Success")); 
+            waitForCompaction(backendId_to_backendIP[id], 
backendId_to_backendHttpPort[id], tablet_id)
+        }
+
+        // check rowsets
+        checkCompactionResult.call()
+
+        // trigger master be to do base compaction
+        
assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id], 
backendId_to_backendHttpPort[master_backend_id],
+                    "base", tablet_id).contains("Success")); 
+        waitForCompaction(backendId_to_backendIP[master_backend_id], 
backendId_to_backendHttpPort[master_backend_id], tablet_id)
+
+        // // trigger follower be to fetch compaction result
+        for (String id in follower_backend_id) {
+            assertTrue(triggerCompaction(backendId_to_backendIP[id], 
backendId_to_backendHttpPort[id],
+                    "base", tablet_id).contains("Success")); 
+            waitForCompaction(backendId_to_backendIP[id], 
backendId_to_backendHttpPort[id], tablet_id)
+        }
+
+        // check rowsets
+        checkCompactionResult.call()
+
+        qt_sql """
+        select * from  ${tableName} order by id
+        """
+  
+    } finally {
+        if (has_update_be_config) {
+            set_be_config.call("disable_auto_compaction", 
disableAutoCompaction.toString())
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to