This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 650eabaa93b192c93439783c54c75d857b83bc41
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu Sep 28 22:34:59 2023 +0800

    [Fix](regression-test) Fix stream load 2pc case (#24925)
---
 .../load_p0/stream_load/test_stream_load.groovy    | 139 +++++++++++++++++++++
 .../stream_load/test_stream_load_properties.groovy |  24 +++-
 2 files changed, 161 insertions(+), 2 deletions(-)

diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index 90e9c750bca..968c2e56108 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -942,5 +942,144 @@ suite("test_stream_load", "p0") {
     // parse k1 default value
     assertEquals(res[0][0], 1)
     assertEquals(res[1][0], 1)
+
+    // test two phase commit
+    def tableName15 = "test_two_phase_commit"
+    InetSocketAddress address = context.config.feHttpInetSocketAddress
+    String user = context.config.feHttpUser
+    String password = context.config.feHttpPassword
+    String db = context.config.getDbNameByFile(context.file)
+
+    def do_streamload_2pc = { label, txn_operation ->
+        HttpClients.createDefault().withCloseable { client ->
+            RequestBuilder requestBuilder = 
RequestBuilder.put("http://${address.hostString}:${address.port}/api/${db}/${tableName15}/_stream_load_2pc";)
+            String encoding = Base64.getEncoder()
+                .encodeToString((user + ":" + (password == null ? "" : 
password)).getBytes("UTF-8"))
+            requestBuilder.setHeader("Authorization", "Basic ${encoding}")
+            requestBuilder.setHeader("Expect", "100-Continue")
+            requestBuilder.setHeader("label", "${label}")
+            requestBuilder.setHeader("txn_operation", "${txn_operation}")
+
+            String backendStreamLoadUri = null
+            client.execute(requestBuilder.build()).withCloseable { resp ->
+                resp.withCloseable {
+                    String body = EntityUtils.toString(resp.getEntity())
+                    def respCode = resp.getStatusLine().getStatusCode()
+                    // should redirect to backend
+                    if (respCode != 307) {
+                        throw new IllegalStateException("Expect frontend 
stream load response code is 307, " +
+                                "but meet ${respCode}\nbody: ${body}")
+                    }
+                    backendStreamLoadUri = 
resp.getFirstHeader("location").getValue()
+                }
+            }
+
+            requestBuilder.setUri(backendStreamLoadUri)
+            try{
+                client.execute(requestBuilder.build()).withCloseable { resp ->
+                    resp.withCloseable {
+                        String body = EntityUtils.toString(resp.getEntity())
+                        def respCode = resp.getStatusLine().getStatusCode()
+                        if (respCode != 200) {
+                            throw new IllegalStateException("Expect backend 
stream load response code is 200, " +
+                                    "but meet ${respCode}\nbody: ${body}")
+                        }
+                    }
+                }
+            } catch (Throwable t) {
+                log.info("StreamLoad Exception: ", t)
+            }
+        }
+    }
+
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName15} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName15} (
+                `k1` bigint(20) NULL DEFAULT "1",
+                `k2` bigint(20) NULL ,
+                `v1` tinyint(4) NULL,
+                `v2` tinyint(4) NULL,
+                `v3` tinyint(4) NULL,
+                `v4` DATETIME NULL
+            ) ENGINE=OLAP
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        def label = UUID.randomUUID().toString().replaceAll("-", "") 
+        streamLoad {
+            table "${tableName15}"
+
+            set 'label', "${label}"
+            set 'column_separator', '|'
+            set 'columns', 'k1, k2, v1, v2, v3'
+            set 'two_phase_commit', 'true'
+
+            file 'test_two_phase_commit.csv'
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(2, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+            }
+        }
+
+        qt_sql_2pc "select * from ${tableName15} order by k1"
+
+        do_streamload_2pc.call(label, "abort")
+
+        qt_sql_2pc_abort "select * from ${tableName15} order by k1"
+
+        streamLoad {
+            table "${tableName15}"
+
+            set 'label', "${label}"
+            set 'column_separator', '|'
+            set 'columns', 'k1, k2, v1, v2, v3'
+            set 'two_phase_commit', 'true'
+
+            file 'test_two_phase_commit.csv'
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(2, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+            }
+        }
+
+        do_streamload_2pc.call(label, "commit")
+        
+        def count = 0
+        while (true) {
+            res = sql "select count(*) from ${tableName15}"
+            if (res[0][0] > 0) {
+                break
+            }
+            if (count >= 60) {
+                log.error("stream load commit can not visible for long time")
+                assertEquals(2, res[0][0])
+                break
+            }
+            sleep(1000)
+            count++
+        }
+
+        qt_sql_2pc_commit "select * from ${tableName15} order by k1"
+    } finally {
+        sql """ DROP TABLE IF EXISTS ${tableName15} FORCE"""
+    }
 }
 
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_properties.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load_properties.groovy
index 2b8d56a7a47..19e707f3851 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_stream_load_properties.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_properties.groovy
@@ -559,7 +559,26 @@ suite("test_stream_load_properties", "p0") {
 
             do_streamload_2pc.call(txnId, "commit", tableName1)
 
-            sleep(60)
+            def count = 0
+            while (true) {
+                def res
+                if (i <= 3) {
+                    res = sql "select count(*) from ${tableName1}"
+                } else {
+                    res = sql "select count(*) from ${tableName1}"
+                }
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 60) {
+                    log.error("stream load commit can not visible for long 
time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(1000)
+                count++
+            }
+            
             if (i <= 3) {
                 qt_sql_2pc_commit "select * from ${tableName1} order by 
k00,k01"
             } else {
@@ -570,7 +589,8 @@ suite("test_stream_load_properties", "p0") {
         }
     } finally {
         for (String tableName in tables) {
-            sql new 
File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
+            def tableName1 =  "stream_load_" + tableName
+            sql "DROP TABLE IF EXISTS ${tableName1} FORCE"
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to