This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 0196abe0c07 [feature](regression test) stream load support direct load
to be #26829 # (#31183)
0196abe0c07 is described below
commit 0196abe0c0739fbb7c3df9a9b131c2237bdf8948
Author: yujun <[email protected]>
AuthorDate: Sat May 4 09:41:09 2024 +0800
[feature](regression test) stream load support direct load to be #26829 #
(#31183)
---
regression-test/data/demo_p0/streamLoad_action.out | 114 ++++++++++++++++++++-
.../regression/action/StreamLoadAction.groovy | 30 ++++--
.../suites/chaos/test_docker_example.groovy | 51 ---------
.../suites/demo_p0/streamLoad_action.groovy | 18 ++++
4 files changed, 150 insertions(+), 63 deletions(-)
diff --git a/regression-test/data/demo_p0/streamLoad_action.out
b/regression-test/data/demo_p0/streamLoad_action.out
index 445a11c6dc6..40634eadc3d 100644
--- a/regression-test/data/demo_p0/streamLoad_action.out
+++ b/regression-test/data/demo_p0/streamLoad_action.out
@@ -1,8 +1,118 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_1 --
+1 BeiJing
+1 BeiJing
+2 ShangHai
+2 ShangHai
+3 GuangZhou
+3 GuangZhou
+
+-- !select_2 --
+0 a_0
+1 a_1
+10 a_10
+11 a_11
+12 a_12
+13 a_13
+14 a_14
+15 a_15
+16 a_16
+17 a_17
+18 a_18
+19 a_19
+2 a_2
+20 a_20
+21 a_21
+22 a_22
+23 a_23
+24 a_24
+25 a_25
+26 a_26
+27 a_27
+28 a_28
+29 a_29
+3 a_3
+30 a_30
+31 a_31
+32 a_32
+33 a_33
+34 a_34
+35 a_35
+36 a_36
+37 a_37
+38 a_38
+39 a_39
+4 a_4
+40 a_40
+41 a_41
+42 a_42
+43 a_43
+44 a_44
+45 a_45
+46 a_46
+47 a_47
+48 a_48
+49 a_49
+5 a_5
+50 a_50
+51 a_51
+52 a_52
+53 a_53
+54 a_54
+55 a_55
+56 a_56
+57 a_57
+58 a_58
+59 a_59
+6 a_6
+60 a_60
+61 a_61
+62 a_62
+63 a_63
+64 a_64
+65 a_65
+66 a_66
+67 a_67
+68 a_68
+69 a_69
+7 a_7
+70 a_70
+71 a_71
+72 a_72
+73 a_73
+74 a_74
+75 a_75
+76 a_76
+77 a_77
+78 a_78
+79 a_79
+8 a_8
+80 a_80
+81 a_81
+82 a_82
+83 a_83
+84 a_84
+85 a_85
+86 a_86
+87 a_87
+88 a_88
+89 a_89
+9 a_9
+90 a_90
+91 a_91
+92 a_92
+93 a_93
+94 a_94
+95 a_95
+96 a_96
+97 a_97
+98 a_98
+99 a_99
+
-- !sql --
0
2
-2
-3
3
+4
+5
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
index 04a2b7979b4..e6681649b42 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
@@ -40,7 +40,7 @@ import org.junit.Assert
@Slf4j
class StreamLoadAction implements SuiteAction {
- public final InetSocketAddress address
+ public InetSocketAddress address
public final String user
public final String password
String db
@@ -53,6 +53,7 @@ class StreamLoadAction implements SuiteAction {
Closure check
Map<String, String> headers
SuiteContext context
+ boolean directToBe = false
StreamLoadAction(SuiteContext context) {
this.address = context.getFeHttpAddress()
@@ -83,6 +84,11 @@ class StreamLoadAction implements SuiteAction {
this.table = table.call()
}
+ void directToBe(String beHost, int beHttpPort) {
+ this.address = new InetSocketAddress(beHost, beHttpPort)
+ this.directToBe = true
+ }
+
void inputStream(InputStream inputStream) {
this.inputStream = inputStream
}
@@ -136,21 +142,28 @@ class StreamLoadAction implements SuiteAction {
String responseText = null
Throwable ex = null
long startTime = System.currentTimeMillis()
+ def isHttpStream = headers.containsKey("version")
try {
- def uri =
"http://${address.hostString}:${address.port}/api/${db}/${table}/_stream_load"
+ def uri = isHttpStream ?
"http://${address.hostString}:${address.port}/api/_http_stream"
+ :
"http://${address.hostString}:${address.port}/api/${db}/${table}/_stream_load"
HttpClients.createDefault().withCloseable { client ->
RequestBuilder requestBuilder =
prepareRequestHeader(RequestBuilder.put(uri))
HttpEntity httpEntity = prepareHttpEntity(client)
- String beLocation = streamLoadToFe(client, requestBuilder)
- responseText = streamLoadToBe(client, requestBuilder,
beLocation, httpEntity)
+ if (!directToBe) {
+ String beLocation = streamLoadToFe(client, requestBuilder)
+ log.info("Redirect stream load to
${beLocation}".toString())
+ requestBuilder.setUri(beLocation)
+ }
+ requestBuilder.setEntity(httpEntity)
+ responseText = streamLoadToBe(client, requestBuilder)
}
} catch (Throwable t) {
ex = t
}
long endTime = System.currentTimeMillis()
- log.info("Stream load elapsed ${endTime - startTime} ms, response:
${responseText}".toString() +
- ex.toString())
+ log.info("Stream load elapsed ${endTime - startTime} ms, is http
stream: ${isHttpStream}, " +
+ " response: ${responseText}" + ex.toString())
checkResult(responseText, ex, startTime, endTime)
}
@@ -266,10 +279,7 @@ class StreamLoadAction implements SuiteAction {
return backendStreamLoadUri
}
- private String streamLoadToBe(CloseableHttpClient client, RequestBuilder
requestBuilder, String beLocation, HttpEntity httpEntity) {
- log.info("Redirect stream load to ${beLocation}".toString())
- requestBuilder.setUri(beLocation)
- requestBuilder.setEntity(httpEntity)
+ private String streamLoadToBe(CloseableHttpClient client, RequestBuilder
requestBuilder) {
String responseText
try{
client.execute(requestBuilder.build()).withCloseable { resp ->
diff --git a/regression-test/suites/chaos/test_docker_example.groovy
b/regression-test/suites/chaos/test_docker_example.groovy
deleted file mode 100644
index e63fd5edbc9..00000000000
--- a/regression-test/suites/chaos/test_docker_example.groovy
+++ /dev/null
@@ -1,51 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-import org.apache.doris.regression.suite.ClusterOptions
-
-suite("test_docker_example") {
- docker {
- sql """create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10"""
- sql """insert into tb1 values (1),(2),(3)"""
-
- cluster.checkBeIsAlive(2, true)
-
- // stop backend 2, 3
- cluster.stopBackends(2, 3)
- // wait be lost heartbeat
- Thread.sleep(6000)
-
- cluster.checkBeIsAlive(2, false)
-
- test {
- sql """insert into tb1 values (4),(5),(6)"""
-
- // REPLICA_FEW_ERR
- exception "errCode = 3,"
- }
- }
-
- // run another docker
- def options = new ClusterOptions()
- // add fe config items
- options.feConfigs += ["example_conf_k1=v1", "example_conf_k2=v2"]
- // contains 5 backends
- options.beNum = 5
- docker (options) {
- sql """create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10
properties ("replication_num"="5")"""
- }
-}
diff --git a/regression-test/suites/demo_p0/streamLoad_action.groovy
b/regression-test/suites/demo_p0/streamLoad_action.groovy
index 7ed9c375c47..a11aed7a1c5 100644
--- a/regression-test/suites/demo_p0/streamLoad_action.groovy
+++ b/regression-test/suites/demo_p0/streamLoad_action.groovy
@@ -53,6 +53,22 @@ suite("streamLoad_action") {
// stream load action will check result, include Success status, and
NumberTotalRows == NumberLoadedRows
}
+ def backendIps = [:]
+ def backendHttpPorts = [:]
+ getBackendIpHttpPort(backendIps, backendHttpPorts)
+ def backendId = backendIps.keySet()[0]
+ streamLoad {
+ table tableName
+ set 'column_separator', ','
+ file 'streamload_input.csv'
+
+ // can direct to backend, then this backend is the txn coordinator.
+ directToBe backendIps.get(backendId),
backendHttpPorts.get(backendId) as int
+ }
+
+ order_qt_select_1 "SELECT * FROM ${tableName}"
+ sql "TRUNCATE TABLE ${tableName}"
+
// stream load 100 rows
def rowCount = 100
// range: [0, rowCount)
@@ -80,6 +96,8 @@ suite("streamLoad_action") {
}
}
+ order_qt_select_2 "SELECT * FROM ${tableName}"
+
// to test merge sort
sql """ DROP TABLE IF EXISTS B """
sql """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]