This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 0196abe0c07 [feature](regression test) stream load support direct load 
to be #26829 # (#31183)
0196abe0c07 is described below

commit 0196abe0c0739fbb7c3df9a9b131c2237bdf8948
Author: yujun <[email protected]>
AuthorDate: Sat May 4 09:41:09 2024 +0800

    [feature](regression test) stream load support direct load to be #26829 # 
(#31183)
---
 regression-test/data/demo_p0/streamLoad_action.out | 114 ++++++++++++++++++++-
 .../regression/action/StreamLoadAction.groovy      |  30 ++++--
 .../suites/chaos/test_docker_example.groovy        |  51 ---------
 .../suites/demo_p0/streamLoad_action.groovy        |  18 ++++
 4 files changed, 150 insertions(+), 63 deletions(-)

diff --git a/regression-test/data/demo_p0/streamLoad_action.out 
b/regression-test/data/demo_p0/streamLoad_action.out
index 445a11c6dc6..40634eadc3d 100644
--- a/regression-test/data/demo_p0/streamLoad_action.out
+++ b/regression-test/data/demo_p0/streamLoad_action.out
@@ -1,8 +1,118 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_1 --
+1      BeiJing
+1      BeiJing
+2      ShangHai
+2      ShangHai
+3      GuangZhou
+3      GuangZhou
+
+-- !select_2 --
+0      a_0
+1      a_1
+10     a_10
+11     a_11
+12     a_12
+13     a_13
+14     a_14
+15     a_15
+16     a_16
+17     a_17
+18     a_18
+19     a_19
+2      a_2
+20     a_20
+21     a_21
+22     a_22
+23     a_23
+24     a_24
+25     a_25
+26     a_26
+27     a_27
+28     a_28
+29     a_29
+3      a_3
+30     a_30
+31     a_31
+32     a_32
+33     a_33
+34     a_34
+35     a_35
+36     a_36
+37     a_37
+38     a_38
+39     a_39
+4      a_4
+40     a_40
+41     a_41
+42     a_42
+43     a_43
+44     a_44
+45     a_45
+46     a_46
+47     a_47
+48     a_48
+49     a_49
+5      a_5
+50     a_50
+51     a_51
+52     a_52
+53     a_53
+54     a_54
+55     a_55
+56     a_56
+57     a_57
+58     a_58
+59     a_59
+6      a_6
+60     a_60
+61     a_61
+62     a_62
+63     a_63
+64     a_64
+65     a_65
+66     a_66
+67     a_67
+68     a_68
+69     a_69
+7      a_7
+70     a_70
+71     a_71
+72     a_72
+73     a_73
+74     a_74
+75     a_75
+76     a_76
+77     a_77
+78     a_78
+79     a_79
+8      a_8
+80     a_80
+81     a_81
+82     a_82
+83     a_83
+84     a_84
+85     a_85
+86     a_86
+87     a_87
+88     a_88
+89     a_89
+9      a_9
+90     a_90
+91     a_91
+92     a_92
+93     a_93
+94     a_94
+95     a_95
+96     a_96
+97     a_97
+98     a_98
+99     a_99
+
 -- !sql --
 0
 2
-2
-3
 3
+4
+5
 
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
index 04a2b7979b4..e6681649b42 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
@@ -40,7 +40,7 @@ import org.junit.Assert
 
 @Slf4j
 class StreamLoadAction implements SuiteAction {
-    public final InetSocketAddress address
+    public InetSocketAddress address
     public final String user
     public final String password
     String db
@@ -53,6 +53,7 @@ class StreamLoadAction implements SuiteAction {
     Closure check
     Map<String, String> headers
     SuiteContext context
+    boolean directToBe = false
 
     StreamLoadAction(SuiteContext context) {
         this.address = context.getFeHttpAddress()
@@ -83,6 +84,11 @@ class StreamLoadAction implements SuiteAction {
         this.table = table.call()
     }
 
+    void directToBe(String beHost, int beHttpPort) {
+        this.address = new InetSocketAddress(beHost, beHttpPort)
+        this.directToBe = true
+    }
+
     void inputStream(InputStream inputStream) {
         this.inputStream = inputStream
     }
@@ -136,21 +142,28 @@ class StreamLoadAction implements SuiteAction {
         String responseText = null
         Throwable ex = null
         long startTime = System.currentTimeMillis()
+        def isHttpStream = headers.containsKey("version")
         try {
-            def uri = 
"http://${address.hostString}:${address.port}/api/${db}/${table}/_stream_load";
+            def uri = isHttpStream ? 
"http://${address.hostString}:${address.port}/api/_http_stream";
+                    : 
"http://${address.hostString}:${address.port}/api/${db}/${table}/_stream_load";
             HttpClients.createDefault().withCloseable { client ->
                 RequestBuilder requestBuilder = 
prepareRequestHeader(RequestBuilder.put(uri))
                 HttpEntity httpEntity = prepareHttpEntity(client)
-                String beLocation = streamLoadToFe(client, requestBuilder)
-                responseText = streamLoadToBe(client, requestBuilder, 
beLocation, httpEntity)
+                if (!directToBe) {
+                    String beLocation = streamLoadToFe(client, requestBuilder)
+                    log.info("Redirect stream load to 
${beLocation}".toString())
+                    requestBuilder.setUri(beLocation)
+                }
+                requestBuilder.setEntity(httpEntity)
+                responseText = streamLoadToBe(client, requestBuilder)
             }
         } catch (Throwable t) {
             ex = t
         }
         long endTime = System.currentTimeMillis()
 
-        log.info("Stream load elapsed ${endTime - startTime} ms, response: 
${responseText}".toString() +
-                 ex.toString())
+        log.info("Stream load elapsed ${endTime - startTime} ms, is http 
stream: ${isHttpStream}, " +
+                " response: ${responseText}" + ex.toString())
         checkResult(responseText, ex, startTime, endTime)
     }
 
@@ -266,10 +279,7 @@ class StreamLoadAction implements SuiteAction {
         return backendStreamLoadUri
     }
 
-    private String streamLoadToBe(CloseableHttpClient client, RequestBuilder 
requestBuilder, String beLocation, HttpEntity httpEntity) {
-        log.info("Redirect stream load to ${beLocation}".toString())
-        requestBuilder.setUri(beLocation)
-        requestBuilder.setEntity(httpEntity)
+    private String streamLoadToBe(CloseableHttpClient client, RequestBuilder 
requestBuilder) {
         String responseText
         try{
             client.execute(requestBuilder.build()).withCloseable { resp ->
diff --git a/regression-test/suites/chaos/test_docker_example.groovy 
b/regression-test/suites/chaos/test_docker_example.groovy
deleted file mode 100644
index e63fd5edbc9..00000000000
--- a/regression-test/suites/chaos/test_docker_example.groovy
+++ /dev/null
@@ -1,51 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-import org.apache.doris.regression.suite.ClusterOptions
-
-suite("test_docker_example") {
-    docker {
-        sql """create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10"""
-        sql """insert into tb1 values (1),(2),(3)"""
-
-        cluster.checkBeIsAlive(2, true)
-
-        // stop backend 2, 3
-        cluster.stopBackends(2, 3)
-        // wait be lost heartbeat
-        Thread.sleep(6000)
-
-        cluster.checkBeIsAlive(2, false)
-
-        test {
-            sql """insert into tb1 values (4),(5),(6)"""
-
-            // REPLICA_FEW_ERR
-            exception "errCode = 3,"
-        }
-    }
-
-    // run another docker
-    def options = new ClusterOptions()
-    // add fe config items
-    options.feConfigs += ["example_conf_k1=v1", "example_conf_k2=v2"]
-    // contains 5 backends
-    options.beNum = 5
-    docker (options) {
-        sql """create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10 
properties ("replication_num"="5")"""
-    }
-}
diff --git a/regression-test/suites/demo_p0/streamLoad_action.groovy 
b/regression-test/suites/demo_p0/streamLoad_action.groovy
index 7ed9c375c47..a11aed7a1c5 100644
--- a/regression-test/suites/demo_p0/streamLoad_action.groovy
+++ b/regression-test/suites/demo_p0/streamLoad_action.groovy
@@ -53,6 +53,22 @@ suite("streamLoad_action") {
         // stream load action will check result, include Success status, and 
NumberTotalRows == NumberLoadedRows
     }
 
+    def backendIps = [:]
+    def backendHttpPorts = [:]
+    getBackendIpHttpPort(backendIps, backendHttpPorts)
+    def backendId = backendIps.keySet()[0]
+    streamLoad {
+        table tableName
+        set 'column_separator', ','
+        file 'streamload_input.csv'
+
+        // can direct to backend, then this backend is the txn coordinator.
+        directToBe  backendIps.get(backendId),  
backendHttpPorts.get(backendId) as int
+    }
+
+    order_qt_select_1 "SELECT * FROM ${tableName}"
+    sql "TRUNCATE TABLE ${tableName}"
+
     // stream load 100 rows
     def rowCount = 100
     // range: [0, rowCount)
@@ -80,6 +96,8 @@ suite("streamLoad_action") {
         }
     }
 
+    order_qt_select_2 "SELECT * FROM ${tableName}"
+
     // to test merge sort
     sql """ DROP TABLE IF EXISTS B """
     sql """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to