This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 650eabaa93b192c93439783c54c75d857b83bc41 Author: HHoflittlefish777 <[email protected]> AuthorDate: Thu Sep 28 22:34:59 2023 +0800 [Fix](regression-test) Fix stream load 2pc case (#24925) --- .../load_p0/stream_load/test_stream_load.groovy | 139 +++++++++++++++++++++ .../stream_load/test_stream_load_properties.groovy | 24 +++- 2 files changed, 161 insertions(+), 2 deletions(-) diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy index 90e9c750bca..968c2e56108 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy @@ -942,5 +942,144 @@ suite("test_stream_load", "p0") { // parse k1 default value assertEquals(res[0][0], 1) assertEquals(res[1][0], 1) + + // test two phase commit + def tableName15 = "test_two_phase_commit" + InetSocketAddress address = context.config.feHttpInetSocketAddress + String user = context.config.feHttpUser + String password = context.config.feHttpPassword + String db = context.config.getDbNameByFile(context.file) + + def do_streamload_2pc = { label, txn_operation -> + HttpClients.createDefault().withCloseable { client -> + RequestBuilder requestBuilder = RequestBuilder.put("http://${address.hostString}:${address.port}/api/${db}/${tableName15}/_stream_load_2pc") + String encoding = Base64.getEncoder() + .encodeToString((user + ":" + (password == null ? "" : password)).getBytes("UTF-8")) + requestBuilder.setHeader("Authorization", "Basic ${encoding}") + requestBuilder.setHeader("Expect", "100-Continue") + requestBuilder.setHeader("label", "${label}") + requestBuilder.setHeader("txn_operation", "${txn_operation}") + + String backendStreamLoadUri = null + client.execute(requestBuilder.build()).withCloseable { resp -> + resp.withCloseable { + String body = EntityUtils.toString(resp.getEntity()) + def respCode = resp.getStatusLine().getStatusCode() + // should redirect to backend + if (respCode != 307) { + throw new IllegalStateException("Expect frontend stream load response code is 307, " + + "but meet ${respCode}\nbody: ${body}") + } + backendStreamLoadUri = resp.getFirstHeader("location").getValue() + } + } + + requestBuilder.setUri(backendStreamLoadUri) + try{ + client.execute(requestBuilder.build()).withCloseable { resp -> + resp.withCloseable { + String body = EntityUtils.toString(resp.getEntity()) + def respCode = resp.getStatusLine().getStatusCode() + if (respCode != 200) { + throw new IllegalStateException("Expect backend stream load response code is 200, " + + "but meet ${respCode}\nbody: ${body}") + } + } + } + } catch (Throwable t) { + log.info("StreamLoad Exception: ", t) + } + } + } + + try { + sql """ DROP TABLE IF EXISTS ${tableName15} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName15} ( + `k1` bigint(20) NULL DEFAULT "1", + `k2` bigint(20) NULL , + `v1` tinyint(4) NULL, + `v2` tinyint(4) NULL, + `v3` tinyint(4) NULL, + `v4` DATETIME NULL + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + def label = UUID.randomUUID().toString().replaceAll("-", "") + streamLoad { + table "${tableName15}" + + set 'label', "${label}" + set 'column_separator', '|' + set 'columns', 'k1, k2, v1, v2, v3' + set 'two_phase_commit', 'true' + + file 'test_two_phase_commit.csv' + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(2, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + qt_sql_2pc "select * from ${tableName15} order by k1" + + do_streamload_2pc.call(label, "abort") + + qt_sql_2pc_abort "select * from ${tableName15} order by k1" + + streamLoad { + table "${tableName15}" + + set 'label', "${label}" + set 'column_separator', '|' + set 'columns', 'k1, k2, v1, v2, v3' + set 'two_phase_commit', 'true' + + file 'test_two_phase_commit.csv' + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(2, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + do_streamload_2pc.call(label, "commit") + + def count = 0 + while (true) { + res = sql "select count(*) from ${tableName15}" + if (res[0][0] > 0) { + break + } + if (count >= 60) { + log.error("stream load commit can not visible for long time") + assertEquals(2, res[0][0]) + break + } + sleep(1000) + count++ + } + + qt_sql_2pc_commit "select * from ${tableName15} order by k1" + } finally { + sql """ DROP TABLE IF EXISTS ${tableName15} FORCE""" + } } diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_properties.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_properties.groovy index 2b8d56a7a47..19e707f3851 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load_properties.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_properties.groovy @@ -559,7 +559,26 @@ suite("test_stream_load_properties", "p0") { do_streamload_2pc.call(txnId, "commit", tableName1) - sleep(60) + def count = 0 + while (true) { + def res + if (i <= 3) { + res = sql "select count(*) from ${tableName1}" + } else { + res = sql "select count(*) from ${tableName1}" + } + if (res[0][0] > 0) { + break + } + if (count >= 60) { + log.error("stream load commit can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(1000) + count++ + } + if (i <= 3) { qt_sql_2pc_commit "select * from ${tableName1} order by k00,k01" } else { @@ -570,7 +589,8 @@ suite("test_stream_load_properties", "p0") { } } finally { for (String tableName in tables) { - sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text + def tableName1 = "stream_load_" + tableName + sql "DROP TABLE IF EXISTS ${tableName1} FORCE" } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
