This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d5908b0c8b2 [regression](compaction) Add case to test base_compaction 
with dup_key_max_file_size_limit (#27088)
d5908b0c8b2 is described below

commit d5908b0c8b248103912383348a8e34d253b6661c
Author: Xiaocc <[email protected]>
AuthorDate: Fri Nov 24 10:27:51 2023 +0800

    [regression](compaction) Add case to test base_compaction with 
dup_key_max_file_size_limit (#27088)
---
 ...paction_with_dup_key_max_file_size_limit.groovy | 227 +++++++++++++++++++++
 1 file changed, 227 insertions(+)

diff --git 
a/regression-test/suites/compaction/test_base_compaction_with_dup_key_max_file_size_limit.groovy
 
b/regression-test/suites/compaction/test_base_compaction_with_dup_key_max_file_size_limit.groovy
new file mode 100644
index 00000000000..d4c66012954
--- /dev/null
+++ 
b/regression-test/suites/compaction/test_base_compaction_with_dup_key_max_file_size_limit.groovy
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_base_compaction_with_dup_key_max_file_size_limit", "p2") {
+    def tableName = "test_base_compaction_with_dup_key_max_file_size_limit"
+ 
+    // use customer table of tpch_sf100
+    def rows = 15000000
+    def load_tpch_sf100_customer = { 
+        def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() 
+        def rowCount = sql "select count(*) from ${tableName}"
+        def s3BucketName = getS3BucketName()
+        def s3WithProperties = """WITH S3 (
+            |"AWS_ACCESS_KEY" = "${getS3AK()}",
+            |"AWS_SECRET_KEY" = "${getS3SK()}",
+            |"AWS_ENDPOINT" = "${getS3Endpoint()}",
+            |"AWS_REGION" = "${getS3Region()}")
+            |PROPERTIES(
+            |"exec_mem_limit" = "8589934592",
+            |"load_parallelism" = "3")""".stripMargin()
+        sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = 
'161061273600')"
+        if (rowCount[0][0] != rows) {
+            def loadLabel = tableName + "_" + uniqueID
+            // load data from cos
+            def loadSql = """
+            LOAD LABEL ${loadLabel}(
+                DATA 
INFILE("s3://${s3BucketName}/regression/tpch/sf100/customer.tbl")
+                INTO TABLE ${tableName}
+                COLUMNS TERMINATED BY "|"
+                (c_custkey, c_name, c_address, c_nationkey, c_phone, 
c_acctbal, c_mktsegment, c_comment, temp)
+            )
+            """
+            loadSql = loadSql + s3WithProperties
+            sql loadSql
+
+            // check load state
+            while (true) {
+                def stateResult = sql "show load where Label = '${loadLabel}'"
+                logger.info("load result is ${stateResult}")
+                def loadState = stateResult[stateResult.size() - 
1][2].toString()
+                if ("CANCELLED".equalsIgnoreCase(loadState)) {
+                    throw new IllegalStateException("load ${loadLabel} 
failed.")
+                } else if ("FINISHED".equalsIgnoreCase(loadState)) {
+                    rows += 15000000
+                    break
+                }
+                sleep(5000)
+            }
+        } 
+    }
+    try {
+        String backend_id;
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        backend_id = backendId_to_backendIP.keySet()[0]
+        def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+        
+        logger.info("Show config: code=" + code + ", out=" + out + ", err=" + 
err)
+        assertEquals(code, 0)
+        def configList = parseJson(out.trim())
+        assert configList instanceof List
+
+        boolean disableAutoCompaction = true
+        for (Object ele in (List) configList) {
+            assert ele instanceof List<String>
+            if (((List<String>) ele)[0] == "disable_auto_compaction") {
+                disableAutoCompaction = Boolean.parseBoolean(((List<String>) 
ele)[2])
+            }
+        }
+
+        def triggerCompaction = { be_host, be_http_port, compact_type, 
tablet_id ->
+            // trigger compactions for all tablets in ${tableName}
+            StringBuilder sb = new StringBuilder();
+            sb.append("curl -X POST http://${be_host}:${be_http_port}";)
+            sb.append("/api/compaction/run?tablet_id=")
+            sb.append(tablet_id)
+            sb.append("&compact_type=${compact_type}")
+
+            String command = sb.toString()
+            logger.info(command)
+            process = command.execute()
+            code = process.waitFor()
+            err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())));
+            out = process.getText()
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
disableAutoCompaction " + disableAutoCompaction + ", err=" + err)
+            if (!disableAutoCompaction) {
+                return "Success, " + out
+            }
+            assertEquals(code, 0)
+            return out
+        } 
+
+        def waitForCompaction = { be_host, be_http_port, tablet_id ->
+            // wait for all compactions done
+            boolean running = true
+            do {
+                Thread.sleep(1000)
+                StringBuilder sb = new StringBuilder();
+                sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+                sb.append("/api/compaction/run_status?tablet_id=")
+                sb.append(tablet_id)
+
+                String command = sb.toString()
+                logger.info(command)
+                process = command.execute()
+                code = process.waitFor()
+                out = process.getText()
+                logger.info("Get compaction status: code=" + code + ", out=" + 
out)
+                assertEquals(code, 0)
+                def compactionStatus = parseJson(out.trim())
+                assertEquals("success", compactionStatus.status.toLowerCase())
+                running = compactionStatus.run_status
+            } while (running)
+        }
+
+        sql """ DROP TABLE IF EXISTS ${tableName}; """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+            C_CUSTKEY     INTEGER NOT NULL,
+            C_NAME        VARCHAR(25) NOT NULL,
+            C_ADDRESS     VARCHAR(40) NOT NULL,
+            C_NATIONKEY   INTEGER NOT NULL,
+            C_PHONE       CHAR(15) NOT NULL,
+            C_ACCTBAL     DECIMAL(15,2)   NOT NULL,
+            C_MKTSEGMENT  CHAR(10) NOT NULL,
+            C_COMMENT     VARCHAR(117) NOT NULL
+            )
+            DUPLICATE KEY(C_CUSTKEY, C_NAME)
+            DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1
+            PROPERTIES (
+            "replication_num" = "1", "disable_auto_compaction" = "true"
+            )
+        """ 
+
+        def tablet = (sql """ show tablets from ${tableName}; """)[0]
+        String tablet_id = tablet[0]
+        String trigger_backend_id = tablet[2]
+
+        // rowsets:
+        //      [0-1] 0
+        //      [2-2] 1G overlapping
+        // cp: -1
+        load_tpch_sf100_customer.call();
+
+        // rowsets:
+        //      [0-1] 0
+        //      [2-2] 1G nooverlapping
+        // cp: 3
+        
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id],
+                    "cumulative", tablet_id).contains("Success"));
+        waitForCompaction(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+
+
+        // rowsets:
+        //      [0-1] 0
+        //      [2-2] 1G nooverlapping
+        //      [3-3] 1G overlapping
+        // cp: 3
+        load_tpch_sf100_customer.call();
+
+        // rowsets:
+        //      [0-1] 0
+        //      [2-2] 1G nooverlapping
+        //      [3-3] 1G nooverlapping
+        // cp: 4 
+        
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id],
+                    "cumulative", tablet_id).contains("Success"));
+        waitForCompaction(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+
+
+        // The conditions for base compaction have been satisfied.
+        // Since the size of first input rowset is 0, there is no file size 
limitation. (maybe fix it?)
+        // rowsets:
+        //      [0-3] 2G nooverlapping
+        // cp: 4 
+        
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id],
+                    "base", tablet_id).contains("Success"));
+        waitForCompaction(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id], tablet_id) 
+
+
+        // rowsets:
+        //      [0-3] 2G nooverlapping
+        //      [4-4] 1G overlapping
+        // cp: 4
+        load_tpch_sf100_customer.call();
+
+        // rowsets:
+        //      [0-3] 2G nooverlapping
+        //      [4-4] 1G nooverlapping
+        // cp: 5
+        
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id],
+                    "cumulative", tablet_id).contains("Success"));
+        waitForCompaction(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+
+
+        // Due to the limit of 
config::base_compaction_dup_key_max_file_size_mbytes(1G),
+        // can not do base compaction, return E-808
+        // rowsets:
+        //      [0-3] 2G nooverlapping
+        //      [4-4] 1G nooverlapping
+        // cp: 5
+        
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id],
+                    "base", tablet_id).contains("E-808"));
+
+        def rowCount = sql "select count(*) from ${tableName}"
+        assertTrue(rowCount[0][0] != rows)
+    } finally {
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to