This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 17cf4ab2c13 [case](regression) streamload publish timeout (#29457)
17cf4ab2c13 is described below
commit 17cf4ab2c1333ef75c0631910ae87ee70a280b74
Author: HowardQin <[email protected]>
AuthorDate: Sun Jan 7 19:50:16 2024 +0800
[case](regression) streamload publish timeout (#29457)
Co-authored-by: qinhao <[email protected]>
---
.../load_p0/stream_load/test_publish_timeout.out | 4 +
.../stream_load/test_publish_timeout.groovy | 111 +++++++++++++++++++++
2 files changed, 115 insertions(+)
diff --git a/regression-test/data/load_p0/stream_load/test_publish_timeout.out
b/regression-test/data/load_p0/stream_load/test_publish_timeout.out
new file mode 100644
index 00000000000..1db8882f884
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_publish_timeout.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+2500
+
diff --git
a/regression-test/suites/load_p0/stream_load/test_publish_timeout.groovy
b/regression-test/suites/load_p0/stream_load/test_publish_timeout.groovy
new file mode 100644
index 00000000000..c5853829619
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_publish_timeout.groovy
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_publish_timeout", "nonConcurrent") {
+
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def get_be_param = { paramName ->
+ // assuming paramName on all BEs have save value
+ String backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == paramName) {
+ return ((List<String>) ele)[2]
+ }
+ }
+ }
+
+ def set_be_param = { paramName, paramValue ->
+ // for eache BE node, set paramName=paramValue
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
paramValue))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ String saved_txn_commit_rpc_timeout_ms =
get_be_param.call("txn_commit_rpc_timeout_ms")
+
+ log.info("Old txn_commit_rpc_timeout_ms value is :
${saved_txn_commit_rpc_timeout_ms}".toString())
+
+ // Setting txn_commit_rpc_timeout_ms =< 0 will trigger publish timeout
+ set_be_param.call("txn_commit_rpc_timeout_ms", "0")
+
+ log.info("New txn_commit_rpc_timeout_ms value is : 0".toString())
+
+ def testTable = "all_types"
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${testTable} (
+ `k1` int(11) NULL,
+ `k2` tinyint(4) NULL,
+ `k3` smallint(6) NULL,
+ `k4` bigint(20) NULL,
+ `k5` largeint(40) NULL,
+ `k6` float NULL,
+ `k7` double NULL,
+ `k8` decimal(9, 0) NULL,
+ `k9` char(10) NULL,
+ `k10` varchar(1024) NULL,
+ `k11` text NULL,
+ `k12` date NULL,
+ `k13` datetime NULL
+ ) ENGINE=OLAP
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ streamLoad {
+ table "${testTable}"
+ // set http request header params
+ set 'label', "test_publish_timeout_" + UUID.randomUUID().toString()
+ set 'format', 'csv'
+ set 'column_separator', ','
+ file 'all_types.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertTrue(json.Message.contains("PUBLISH_TIMEOUT"))
+ assertEquals(2500, json.NumberTotalRows)
+ assertEquals(2500, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ qt_sql "SELECT COUNT(*) FROM ${testTable}"
+ } finally {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ }
+ set_be_param.call("txn_commit_rpc_timeout_ms",
saved_txn_commit_rpc_timeout_ms)
+ log.info("Set txn_commit_rpc_timeout_ms value to :
${saved_txn_commit_rpc_timeout_ms}".toString())
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]