This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 659a502ade5 branch-3.1: [Enhancement](json load) Set jsonload's
default behavior to be read_json_by_line #55861 (#56736)
659a502ade5 is described below
commit 659a502ade5a18ded6d323ec47dfab4151ddbe70
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Oct 11 11:29:41 2025 +0800
branch-3.1: [Enhancement](json load) Set jsonload's default behavior to be
read_json_by_line #55861 (#56736)
Cherry-picked from #55861
Co-authored-by: Refrain <[email protected]>
---
be/src/http/action/stream_load.cpp | 27 +-
be/src/vec/exec/format/json/new_json_reader.cpp | 2 +-
.../fileformat/JsonFileFormatProperties.java | 18 +-
.../fileformat/JsonFileFormatPropertiesTest.java | 2 +-
.../stream_load/data_by_array_MultiLine.json | 6 +
.../load_p0/stream_load/data_by_array_oneLine.json | 1 +
.../data/load_p0/stream_load/data_by_line.json | 4 +
.../load_p0/stream_load/data_by_multiArray.json | 2 +
.../test_json_load_default_behavior.groovy | 511 +++++++++++++++++++++
9 files changed, 555 insertions(+), 18 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 56412863b38..ae5c231b1eb 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -527,6 +527,23 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
} else {
request.__set_strip_outer_array(false);
}
+
+ if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
+ if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
+ request.__set_read_json_by_line(true);
+ } else {
+ request.__set_read_json_by_line(false);
+ }
+ } else {
+ request.__set_read_json_by_line(false);
+ }
+
+ if (http_req->header(HTTP_READ_JSON_BY_LINE).empty() &&
+ http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) {
+ request.__set_read_json_by_line(true);
+ request.__set_strip_outer_array(false);
+ }
+
if (!http_req->header(HTTP_NUM_AS_STRING).empty()) {
if (iequal(http_req->header(HTTP_NUM_AS_STRING), "true")) {
request.__set_num_as_string(true);
@@ -546,16 +563,6 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
request.__set_fuzzy_parse(false);
}
- if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
- if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
- request.__set_read_json_by_line(true);
- } else {
- request.__set_read_json_by_line(false);
- }
- } else {
- request.__set_read_json_by_line(false);
- }
-
if (!http_req->header(HTTP_FUNCTION_COLUMN + "." +
HTTP_SEQUENCE_COL).empty()) {
request.__set_sequence_col(
http_req->header(HTTP_FUNCTION_COLUMN + "." +
HTTP_SEQUENCE_COL));
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 3640861de54..4055d4cdec2 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -1253,7 +1253,7 @@ Status NewJsonReader::_simdjson_init_reader() {
RETURN_IF_ERROR(_get_range_params());
RETURN_IF_ERROR(_open_file_reader(false));
- if (_read_json_by_line) {
+ if (LIKELY(_read_json_by_line)) {
RETURN_IF_ERROR(_open_line_reader());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
index 77f3691b09f..5d7955c9e75 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
@@ -59,12 +59,18 @@ public class JsonFileFormatProperties extends
FileFormatProperties {
"", isRemoveOriginProperty);
jsonPaths = getOrDefault(formatProperties, PROP_JSON_PATHS,
"", isRemoveOriginProperty);
- readJsonByLine = Boolean.valueOf(
- getOrDefault(formatProperties, PROP_READ_JSON_BY_LINE,
- "false", isRemoveOriginProperty)).booleanValue();
- stripOuterArray = Boolean.valueOf(
- getOrDefault(formatProperties, PROP_STRIP_OUTER_ARRAY,
- "", isRemoveOriginProperty)).booleanValue();
+ if (!formatProperties.containsKey(PROP_READ_JSON_BY_LINE)
+ && !formatProperties.containsKey(PROP_STRIP_OUTER_ARRAY)) {
+ readJsonByLine = true;
+ stripOuterArray = false;
+ } else {
+ readJsonByLine = Boolean.valueOf(
+ getOrDefault(formatProperties, PROP_READ_JSON_BY_LINE,
+ "false",
isRemoveOriginProperty)).booleanValue();
+ stripOuterArray = Boolean.valueOf(
+ getOrDefault(formatProperties, PROP_STRIP_OUTER_ARRAY,
+ "false",
isRemoveOriginProperty)).booleanValue();
+ }
numAsString = Boolean.valueOf(
getOrDefault(formatProperties, PROP_NUM_AS_STRING,
"", isRemoveOriginProperty)).booleanValue();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java
index a4b78ccd642..1f1e1b2447e 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java
@@ -43,7 +43,7 @@ public class JsonFileFormatPropertiesTest {
Assert.assertEquals("", jsonFileFormatProperties.getJsonRoot());
Assert.assertEquals("", jsonFileFormatProperties.getJsonPaths());
Assert.assertEquals(false,
jsonFileFormatProperties.isStripOuterArray());
- Assert.assertEquals(false,
jsonFileFormatProperties.isReadJsonByLine());
+ Assert.assertEquals(true, jsonFileFormatProperties.isReadJsonByLine());
Assert.assertEquals(false, jsonFileFormatProperties.isNumAsString());
Assert.assertEquals(false, jsonFileFormatProperties.isFuzzyParse());
Assert.assertEquals(CsvFileFormatProperties.DEFAULT_LINE_DELIMITER,
diff --git
a/regression-test/data/load_p0/stream_load/data_by_array_MultiLine.json
b/regression-test/data/load_p0/stream_load/data_by_array_MultiLine.json
new file mode 100644
index 00000000000..1daa72c0dde
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/data_by_array_MultiLine.json
@@ -0,0 +1,6 @@
+[
+ {"a": 1, "b": 11},
+ {"a": 2, "b": 12},
+ {"a": 3, "b": 13},
+ {"a": 4, "b": 14}
+]
\ No newline at end of file
diff --git
a/regression-test/data/load_p0/stream_load/data_by_array_oneLine.json
b/regression-test/data/load_p0/stream_load/data_by_array_oneLine.json
new file mode 100644
index 00000000000..518c99b934d
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/data_by_array_oneLine.json
@@ -0,0 +1 @@
+[{"a": 1, "b": 11},{"a": 2, "b": 12},{"a": 3, "b": 13},{"a": 4, "b": 14}]
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/data_by_line.json
b/regression-test/data/load_p0/stream_load/data_by_line.json
new file mode 100644
index 00000000000..5bb98aef191
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/data_by_line.json
@@ -0,0 +1,4 @@
+{"a": 1, "b": 11}
+{"a": 2, "b": 12}
+{"a": 3, "b": 13}
+{"a": 4, "b": 14}
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/data_by_multiArray.json
b/regression-test/data/load_p0/stream_load/data_by_multiArray.json
new file mode 100644
index 00000000000..e6207801ab5
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/data_by_multiArray.json
@@ -0,0 +1,2 @@
+[{"a": 1, "b": 11},{"a": 2, "b": 12}]
+[{"a": 3, "b": 13},{"a": 4, "b": 14}]
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/stream_load/test_json_load_default_behavior.groovy
b/regression-test/suites/load_p0/stream_load/test_json_load_default_behavior.groovy
new file mode 100644
index 00000000000..83b3531a260
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_json_load_default_behavior.groovy
@@ -0,0 +1,511 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_json_load_default_behavior", "p0") {
+ // case1 test read_json_by_line = true (true, _) - read json by line like
case2
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS test_table1 (
+ a INT,
+ b INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(a)
+ DISTRIBUTED BY RANDOM BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ streamLoad {
+ table "test_table1"
+ set 'format', 'json'
+ set 'columns', 'a,b'
+ set 'read_json_by_line', 'true'
+ file 'data_by_line.json'
+ time 10000
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberLoadedRows, 4) // 成功读入4行数据
+ }
+ }
+ } finally {
+ try_sql("DROP TABLE IF EXISTS test_table1")
+ }
+ // case2: test default behavior (_ _) - check default behavior is read
json by line
+ /*
+ {"a": 1, "b": 11}
+ {"a": 2, "b": 12}
+ {"a": 3, "b": 13}
+ {"a": 4, "b": 14}
+ */
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS test_table2 (
+ a INT,
+ b INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(a)
+ DISTRIBUTED BY RANDOM BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ streamLoad {
+ table "test_table2"
+ set 'format', 'json'
+ set 'columns', 'a,b'
+ file 'data_by_line.json'
+ time 10000
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberLoadedRows, 4) // 成功读入4行数据
+ }
+ }
+ } finally {
+ try_sql("DROP TABLE IF EXISTS test_table2")
+ }
+ // case3: test strip_outer_array=true (_ true)
+ /*
+ [{"a": 1, "b": 11},{"a": 2, "b": 12},{"a": 3, "b": 13},{"a": 4, "b": 14}]
+ */
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS test_table3 (
+ a INT,
+ b INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(a)
+ DISTRIBUTED BY RANDOM BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ streamLoad {
+ table "test_table3"
+ set 'format', 'json'
+ set 'columns', 'a,b'
+ set 'strip_outer_array', 'true'
+ file 'data_by_array_oneLine.json'
+ time 10000
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberLoadedRows, 4)
+ }
+ }
+ } finally {
+ try_sql("DROP TABLE IF EXISTS test_table3")
+ }
+ // case4: test strip_outer_array=true (_ true)
+ /*
+ [
+ {"a": 1, "b": 11},
+ {"a": 2, "b": 12},
+ {"a": 3, "b": 13},
+ {"a": 4, "b": 14}
+ ]
+ */
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS test_table4 (
+ a INT,
+ b INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(a)
+ DISTRIBUTED BY RANDOM BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ streamLoad {
+ table "test_table4"
+ set 'format', 'json'
+ set 'columns', 'a,b'
+ set 'strip_outer_array', 'true'
+ file 'data_by_array_MultiLine.json'
+ time 10000
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberLoadedRows, 4)
+ }
+ }
+ } finally {
+ try_sql("DROP TABLE IF EXISTS test_table4")
+ }
+
+ // case5: test read_json_by_line=true and strip_outer_array=true (true
true)
+ /*
+ [{"a": 1, "b": 11},{"a": 2, "b": 12}]
+ [{"a": 3, "b": 13},{"a": 4, "b": 14}]
+ */
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS test_table5 (
+ a INT,
+ b INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(a)
+ DISTRIBUTED BY RANDOM BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ streamLoad {
+ table "test_table5"
+ set 'format', 'json'
+ set 'columns', 'a,b'
+ set 'read_json_by_line', 'true'
+ set 'strip_outer_array', 'true'
+ file 'data_by_multiArray.json'
+ time 10000
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberLoadedRows, 4) // 两行,每行2个对象,共4条数据
+ }
+ }
+ } finally {
+ try_sql("DROP TABLE IF EXISTS test_table5")
+ }
+
+ // ============================== S3 tvf
=========================================
+ // ============== we just need to test SELECT * FROM S3
==========================
+ def s3BucketName = getS3BucketName()
+ def s3Endpoint = getS3Endpoint()
+ def s3Region = getS3Region()
+ def ak = getS3AK()
+ def sk = getS3SK()
+
+ // case6 test default behavior (_ _)
+ def res1 = sql """
+ SELECT * FROM S3
+ (
+ "uri" = "s3://${s3BucketName}/load/data_by_line.json",
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}",
+ "format" = "json"
+ );
+ """
+ log.info("select frm s3 result: ${res1}".toString())
+ assertTrue(res1.size() == 4) // 检查结果集有4行数据
+
+ // case7 test error
+ // [DATA_QUALITY_ERROR]JSON data is array-object, `strip_outer_array` must
be TRUE
+ test {
+ sql """
+ SELECT * FROM S3
+ (
+ "uri" = "s3://${s3BucketName}/load/data_by_array_oneLine.json",
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}",
+ "format" = "json"
+ );
+ """
+ exception "JSON data is array-object, `strip_outer_array` must be TRUE"
+ }
+
+ // case8 test strip_outer_array=true (_ true)
+ def res2 = sql """
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/load/data_by_array_oneLine.json",
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}",
+ "format" = "json",
+ "strip_outer_array" = "true"
+ );
+ """
+ log.info("select frm s3 result: ${res2}".toString())
+ assertTrue(res2.size() == 4) // 检查结果集有4行数据
+
+ // case9 test strip_outer_array=true (_ true)
+ def res3 = sql """
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/load/data_by_array_MultiLine.json",
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}",
+ "format" = "json",
+ "strip_outer_array" = "true"
+ );
+ """
+ log.info("select frm s3 result: ${res3}".toString())
+ assertTrue(res3.size() == 4) // 检查结果集有4行数据
+
+ // ============================== S3 load
=========================================
+
+ // case10: S3 load test default behavior (_ _) - check default behavior is
read json by line
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS test_table_s3_1 (
+ a INT,
+ b INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(a)
+ DISTRIBUTED BY RANDOM BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ def label10 = "s3_load_default_" +
UUID.randomUUID().toString().replaceAll("-", "")
+ sql """
+ LOAD LABEL ${label10} (
+ DATA INFILE("s3://${s3BucketName}/load/data_by_line.json")
+ INTO TABLE test_table_s3_1
+ FORMAT AS "json"
+ (a, b)
+ )
+ WITH S3 (
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}"
+ );
+ """
+
+ // Wait for load to complete
+ def max_try_time = 60000
+ while (max_try_time > 0) {
+ def result = sql "SHOW LOAD WHERE label = '${label10}'"
+ if (result[0][2] == "FINISHED") {
+ sql "sync"
+ def count = sql "SELECT COUNT(*) FROM test_table_s3_1"
+ assertEquals(4, count[0][0])
+ break
+ } else if (result[0][2] == "CANCELLED") {
+ throw new Exception("Load job cancelled: " + result[0][7])
+ }
+ Thread.sleep(1000)
+ max_try_time -= 1000
+ if (max_try_time <= 0) {
+ throw new Exception("Load job timeout")
+ }
+ }
+ } finally {
+ try_sql("DROP TABLE IF EXISTS test_table_s3_1")
+ }
+
+ // case11: S3 load test strip_outer_array=true (_ true) - one line array
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS test_table_s3_2 (
+ a INT,
+ b INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(a)
+ DISTRIBUTED BY RANDOM BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ def label11 = "s3_load_strip_array_" +
UUID.randomUUID().toString().replaceAll("-", "")
+ sql """
+ LOAD LABEL ${label11} (
+ DATA
INFILE("s3://${s3BucketName}/load/data_by_array_oneLine.json")
+ INTO TABLE test_table_s3_2
+ FORMAT AS "json"
+ (a, b)
+ PROPERTIES(
+ "strip_outer_array" = "true"
+ )
+ )
+ WITH S3 (
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}"
+ );
+ """
+
+ // Wait for load to complete
+ def max_try_time = 60000
+ while (max_try_time > 0) {
+ def result = sql "SHOW LOAD WHERE label = '${label11}'"
+ if (result[0][2] == "FINISHED") {
+ sql "sync"
+ def count = sql "SELECT COUNT(*) FROM test_table_s3_2"
+ assertEquals(4, count[0][0])
+ break
+ } else if (result[0][2] == "CANCELLED") {
+ throw new Exception("Load job cancelled: " + result[0][7])
+ }
+ Thread.sleep(1000)
+ max_try_time -= 1000
+ if (max_try_time <= 0) {
+ throw new Exception("Load job timeout")
+ }
+ }
+ } finally {
+ try_sql("DROP TABLE IF EXISTS test_table_s3_2")
+ }
+
+ // case12: S3 load test strip_outer_array=true (_ true) - multi line array
+ // error S3 load must use read_json_by_line = true
+
+ // case13: S3 load test read_json_by_line=true and strip_outer_array=true
(true true)
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS test_table_s3_4 (
+ a INT,
+ b INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(a)
+ DISTRIBUTED BY RANDOM BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ def label13 = "s3_load_both_true_" +
UUID.randomUUID().toString().replaceAll("-", "")
+ sql """
+ LOAD LABEL ${label13} (
+ DATA
INFILE("s3://${s3BucketName}/load/data_by_multiArray.json")
+ INTO TABLE test_table_s3_4
+ FORMAT AS "json"
+ (a, b)
+ PROPERTIES(
+ "read_json_by_line" = "true",
+ "strip_outer_array" = "true"
+ )
+ )
+ WITH S3 (
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}"
+ );
+ """
+
+ // Wait for load to complete
+ def max_try_time = 60000
+ while (max_try_time > 0) {
+ def result = sql "SHOW LOAD WHERE label = '${label13}'"
+ if (result[0][2] == "FINISHED") {
+ sql "sync"
+ def count = sql "SELECT COUNT(*) FROM test_table_s3_4"
+ assertEquals(4, count[0][0]) // 两行,每行2个对象,共4条数据
+ break
+ } else if (result[0][2] == "CANCELLED") {
+ throw new Exception("Load job cancelled: " + result[0][7])
+ }
+ Thread.sleep(1000)
+ max_try_time -= 1000
+ if (max_try_time <= 0) {
+ throw new Exception("Load job timeout")
+ }
+ }
+ } finally {
+ try_sql("DROP TABLE IF EXISTS test_table_s3_4")
+ }
+
+ // case14: S3 load test read_json_by_line=true (true _) - explicit read
json by line
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS test_table_s3_5 (
+ a INT,
+ b INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(a)
+ DISTRIBUTED BY RANDOM BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ def label14 = "s3_load_read_by_line_" +
UUID.randomUUID().toString().replaceAll("-", "")
+ sql """
+ LOAD LABEL ${label14} (
+ DATA INFILE("s3://${s3BucketName}/load/data_by_line.json")
+ INTO TABLE test_table_s3_5
+ FORMAT AS "json"
+ (a, b)
+ PROPERTIES(
+ "read_json_by_line" = "true"
+ )
+ )
+ WITH S3 (
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}"
+ );
+ """
+
+ // Wait for load to complete
+ def max_try_time = 60000
+ while (max_try_time > 0) {
+ def result = sql "SHOW LOAD WHERE label = '${label14}'"
+ if (result[0][2] == "FINISHED") {
+ sql "sync"
+ def count = sql "SELECT COUNT(*) FROM test_table_s3_5"
+ assertEquals(4, count[0][0])
+ break
+ } else if (result[0][2] == "CANCELLED") {
+ throw new Exception("Load job cancelled: " + result[0][7])
+ }
+ Thread.sleep(1000)
+ max_try_time -= 1000
+ if (max_try_time <= 0) {
+ throw new Exception("Load job timeout")
+ }
+ }
+ } finally {
+ try_sql("DROP TABLE IF EXISTS test_table_s3_5")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]