This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6a4c0d8706b [fix](routine-load) pause job when json path is invalid
#30197
6a4c0d8706b is described below
commit 6a4c0d8706b7b132a081029c0ab223fe7d1c8662
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Mon Jan 22 20:34:14 2024 +0800
[fix](routine-load) pause job when json path is invalid #30197
If jsonpaths is set wrong, routine load job will report error but running
all time.For example:
CREATE ROUTINE LOAD jobName ON tableName
PROPERTIES
(
"format" = "json",
"max_batch_interval" = "5",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"jsonpaths" = "[\'t\',\'a\']"
)
FROM KAFKA
(
"kafka_broker_list" = "$IP:PORT",
"kafka_topic" = "XXX",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
Jsonpaths ['t','a'] is invalid, but job will running all time.
---
be/src/common/status.h | 2 +
be/src/vec/exec/format/json/new_json_reader.cpp | 6 +-
.../doris/load/routineload/RoutineLoadJob.java | 10 ++
.../apache/doris/transaction/TransactionState.java | 3 +-
gensrc/thrift/Status.thrift | 1 +
.../routine_load/data/invalid_json_path.json | 20 ++++
.../routine_load/test_routine_load_error.groovy | 110 +++++++++++++++++++++
7 files changed, 148 insertions(+), 4 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 2bec1c397e8..a4ab9f60b0f 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -35,6 +35,7 @@ namespace ErrorCode {
TStatusError(MEM_ALLOC_FAILED, true); \
TStatusError(BUFFER_ALLOCATION_FAILED, true); \
TStatusError(INVALID_ARGUMENT, false); \
+ TStatusError(INVALID_JSON_PATH, false); \
TStatusError(MINIMUM_RESERVATION_UNAVAILABLE, true); \
TStatusError(CORRUPTION, true); \
TStatusError(IO_ERROR, true); \
@@ -405,6 +406,7 @@ public:
ERROR_CTOR(MemoryAllocFailed, MEM_ALLOC_FAILED)
ERROR_CTOR(BufferAllocFailed, BUFFER_ALLOCATION_FAILED)
ERROR_CTOR(InvalidArgument, INVALID_ARGUMENT)
+ ERROR_CTOR(InvalidJsonPath, INVALID_JSON_PATH)
ERROR_CTOR(MinimumReservationUnavailable, MINIMUM_RESERVATION_UNAVAILABLE)
ERROR_CTOR(Corruption, CORRUPTION)
ERROR_CTOR(IOError, IO_ERROR)
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 21b99ff9720..97affdcd0bb 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -414,12 +414,12 @@ Status NewJsonReader::_parse_jsonpath_and_json_root() {
rapidjson::Document jsonpaths_doc;
if (!jsonpaths_doc.Parse(_jsonpaths.c_str(),
_jsonpaths.length()).HasParseError()) {
if (!jsonpaths_doc.IsArray()) {
- return Status::InvalidArgument("Invalid json path: {}",
_jsonpaths);
+ return Status::InvalidJsonPath("Invalid json path: {}",
_jsonpaths);
}
for (int i = 0; i < jsonpaths_doc.Size(); i++) {
const rapidjson::Value& path = jsonpaths_doc[i];
if (!path.IsString()) {
- return Status::InvalidArgument("Invalid json path: {}",
_jsonpaths);
+ return Status::InvalidJsonPath("Invalid json path: {}",
_jsonpaths);
}
std::vector<JsonPath> parsed_paths;
JsonFunctions::parse_json_paths(path.GetString(),
&parsed_paths);
@@ -427,7 +427,7 @@ Status NewJsonReader::_parse_jsonpath_and_json_root() {
}
} else {
- return Status::InvalidArgument("Invalid json path: {}",
_jsonpaths);
+ return Status::InvalidJsonPath("Invalid json path: {}",
_jsonpaths);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 889d240ce29..9ce8bb2e5ac 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1145,6 +1145,16 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
String msg;
if (txnStatusChangeReason != null) {
switch (txnStatusChangeReason) {
+ case INVALID_JSON_PATH:
+ msg = "be " + taskBeId + " abort task,"
+ + " task id: " +
routineLoadTaskInfo.getId()
+ + " job id: " +
routineLoadTaskInfo.getJobId()
+ + " with reason: " +
txnStatusChangeReasonString
+ + " please check the jsonpaths";
+ updateState(JobState.PAUSED,
+ new
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+ false /* not replay */);
+ return;
case OFFSET_OUT_OF_RANGE:
msg = "be " + taskBeId + " abort task,"
+ " task id: " +
routineLoadTaskInfo.getId()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 59e3d7c25b3..d1a59186862 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -108,7 +108,8 @@ public class TransactionState implements Writable {
TIMEOUT,
OFFSET_OUT_OF_RANGE,
PAUSE,
- NO_PARTITIONS;
+ NO_PARTITIONS,
+ INVALID_JSON_PATH;
public static TxnStatusChangeReason fromString(String reasonString) {
for (TxnStatusChangeReason txnStatusChangeReason :
TxnStatusChangeReason.values()) {
diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift
index 039fd8abe1b..5805430eead 100644
--- a/gensrc/thrift/Status.thrift
+++ b/gensrc/thrift/Status.thrift
@@ -75,6 +75,7 @@ enum TStatusCode {
INCOMPLETE = 44,
OLAP_ERR_VERSION_ALREADY_MERGED = 45,
DATA_QUALITY_ERROR = 46,
+ INVALID_JSON_PATH = 47,
VEC_EXCEPTION = 50,
VEC_LOGIC_ERROR = 51,
diff --git
a/regression-test/suites/load_p0/routine_load/data/invalid_json_path.json
b/regression-test/suites/load_p0/routine_load/data/invalid_json_path.json
new file mode 100644
index 00000000000..6ef920fcad9
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/invalid_json_path.json
@@ -0,0 +1,20 @@
+{"k00": "21", "k01": "2023-08-18", "k02": "0", "k03": "63", "k04": "-27847",
"k05": "-35409596", "k06": "8638201997392767650", "k07": "4919963231735304178",
"k08": "-23382.541", "k09": "-1803403621.4263129", "k10": "-22009767", "k11":
"\\N", "k12": "2023-03-31 10:56:14", "k13": "2023-01-20", "k14": "2023-02-18
13:37:52", "k15": "N", "k16": "T", "k17":
"PSiFwUEx3eVFNtjlnQ70YkgZNvKrGmQ2DN5K9yYHiSdFWeEDB1UpL3Frt8z1kEAIWRDWqXZuyi",
"k18": "\\N"}
+{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573",
"k05": "-1362465190", "k06": "3990845741226497177", "k07":
"2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10":
"\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14":
"2023-07-16 05:03:13", "k15": "D", "k16": "", "k17":
"PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme",
"k18": "\\N"}
+{"k00": "91", "k01": "2023-08-27", "k02": "1", "k03": "90", "k04": "2465",
"k05": "702240964", "k06": "6373830997821598984", "k07": "305860046137409400",
"k08": "15991.356", "k09": "1599972327.386147", "k10": "\\N", "k11": "\\N",
"k12": "2023-04-26 19:31:10", "k13": "2023-07-21", "k14": "\\N", "k15": "2",
"k16": "", "k17":
"B7YKYBYT8w0YC926bZ8Yz1VzyiWw2NWDAiTlEoPVyz9AXGti2Npg1FxWqWk4hEaALw0ZBSuiAIPj41lq36g5QRpPmAjNPK",
"k18": "\\N"}
+{"k00": "80", "k01": "2023-08-18", "k02": "0", "k03": "-18", "k04": "-8971",
"k05": "679027874", "k06": "6535956962935330265", "k07": "3960889045799757165",
"k08": "-13219.76", "k09": "1187161924.505394", "k10": "\\N", "k11": "\\N",
"k12": "2023-03-11 07:40:00", "k13": "2022-11-29", "k14": "2023-01-14
07:24:07", "k15": "N", "k16": "D", "k17":
"3Nhx6xX1qdwaq7lxwLRSKMtJFbC03swWv12mpySSVysH3igGZTiGPuKMsYW7HAkf6CWc7c0nzqDsjuH3FYVMNCWRmfxMrmY8rykQCC4Ve",
"k18": "\\N"}
+{"k00": "20", "k01": "2023-08-17", "k02": "0", "k03": "-5", "k04": "18158",
"k05": "784479801", "k06": "1485484354598941738", "k07":
"-6632681928222776815", "k08": "9708.4307", "k09": "-330432620.706069", "k10":
"\\N", "k11": "\\N", "k12": "2022-09-15 21:40:55", "k13": "2023-02-23", "k14":
"2023-08-13 21:31:54", "k15": "O", "k16": "X", "k17":
"2pYmX2vAhfEEHZZYPsgAmda1G7otnwx5TmUC879FPhDeIjvWI79ksBZpfFG2gp7jhCSbpZiecKGklB5SvG8tm31i5SUqe1xrWgLt4HSq7lMJWp75tx2kxD7pRIOpn",
"k18": "\\N"}
+{"k00": "66", "k01": "2023-08-15", "k02": "1", "k03": "-91", "k04": "28378",
"k05": "609923317", "k06": "4872185586197131212", "k07": "1207709464099378591",
"k08": "\\N", "k09": "-1863683325.9851229", "k10": "\\N", "k11": "\\N", "k12":
"2022-09-24 10:39:23", "k13": "2022-09-24", "k14": "2022-10-16 18:36:43",
"k15": "Y", "k16": "z", "k17":
"AI1BSPQdKiHJiQH1kguyLSWsDXkC7zwy7PwgWnyGSaa9tBKRex8vHBdxg2QSKZKL2mV2lHz7iI1PnsTd4MXDcIKhqiHyPuQPt2tEtgt0UgF6",
"k18": "\\N"}
+{"k00": "49", "k01": "2023-08-08", "k02": "0", "k03": "\\N", "k04": "16275",
"k05": "-2144851675", "k06": "-2303421957908954634", "k07":
"-46526938720058765", "k08": "-13141.143", "k09": "-686632233.2302", "k10":
"\\N", "k11": "\\N", "k12": "2022-09-01 00:16:01", "k13": "2023-03-25", "k14":
"2022-09-07 14:59:03", "k15": "s", "k16": "", "k17": "yvuILR2iNxfe8RRml",
"k18": "\\N"}
+{"k00": "57", "k01": "2023-08-19", "k02": "1", "k03": "2", "k04": "-25462",
"k05": "-74112029", "k06": "6458082754318544493", "k07":
"-7910671781690629051", "k08": "-15205.859", "k09": "-306870797.484914", "k10":
"\\N", "k11": "\\N", "k12": "2023-07-10 18:39:10", "k13": "2023-02-12", "k14":
"2023-01-27 07:26:06", "k15": "y", "k16": "", "k17":
"Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ", "k18": "\\N"}
+{"k00": "31", "k01": "2023-08-27", "k02": "0", "k03": "17", "k04": "-18849",
"k05": "1728109133", "k06": "3266501886640700374", "k07": "527195452623418935",
"k08": "-24062.328", "k09": "-1514348021.262435", "k10": "\\N", "k11": "\\N",
"k12": "2022-10-07 03:24:23", "k13": "2022-09-25", "k14": "\\N", "k15": "0",
"k16": "8", "k17":
"yKMiAntORoRa8svnMfcxlOPwwND1m5s2fdS26Xu6cfs6HK5SAibqIp9h8sZcpjHy4", "k18":
"\\N"}
+{"k00": "81", "k01": "2023-08-23", "k02": "0", "k03": "106", "k04": "11492",
"k05": "-667795397", "k06": "4480250461471356146", "k07":
"-5346660566234294101", "k08": "9082.75", "k09": "385167225.902608", "k10":
"\\N", "k11": "\\N", "k12": "2023-03-20 03:33:16", "k13": "2022-11-24", "k14":
"2023-02-16 18:29:41", "k15": "G", "k16": "9", "k17":
"Lk3eNVQNjucbekD1rZmUlGPiXS5JvcWr2LQzRU8GSGIbSag", "k18": "\\N"}
+{"k00": "58", "k01": "2023-08-22", "k02": "\\N", "k03": "0", "k04": "-18231",
"k05": "1832867360", "k06": "6997858407575297145", "k07":
"2480714305422728023", "k08": "-5450.4888", "k09": "1475901032.138386", "k10":
"\\N", "k11": "\\N", "k12": "2023-02-02 05:13:24", "k13": "2022-09-18", "k14":
"2023-04-23 10:51:15", "k15": "k", "k16": "", "k17":
"LdFXF7Kmfzgmnn2R6zLsXdmi3A2cLBLq4G4WDVNDhxvH7dYH8Kga2WA47uSIxp6NSrwPSdw0ssB1TS8RFJTDJAB0Uba3e05NL2Aiw0ja",
"k18": "\\N"}
+{"k00": "85", "k01": "2023-08-11", "k02": "1", "k03": "-7", "k04": "24304",
"k05": "-2043877415", "k06": "-2024144417867729183", "k07": "\\N", "k08":
"5363.0244", "k09": "-578615669.042831", "k10": "\\N", "k11": "\\N", "k12":
"2023-07-15 01:07:41", "k13": "2023-08-13", "k14": "2023-01-20 11:57:48",
"k15": "i", "k16": "", "k17": "WQ9dh9ajPu0y", "k18": "\\N"}
+{"k00": "60", "k01": "2023-08-27", "k02": "0", "k03": "-52", "k04": "-2338",
"k05": "-757056972", "k06": "1047567408607120856", "k07":
"6541476642780646552", "k08": "6614.0894", "k09": "-1204448798.5178549", "k10":
"\\N", "k11": "\\N", "k12": "2022-12-29 14:47:30", "k13": "2022-09-24", "k14":
"2023-08-01 12:41:59", "k15": "O", "k16": "F", "k17":
"RM4F1Ke7lkcnuxF2nK0j9VBW3MDcgyHR4pseBjtFnqS6GUkVFuzF6u3Cp9Nv7ab0O6UYrpP4DhU",
"k18": "\\N"}
+{"k00": "41", "k01": "2023-08-27", "k02": "1", "k03": "-104", "k04": "22750",
"k05": "\\N", "k06": "8527773271030840740", "k07": "5554497317268279215",
"k08": "-5296.8281", "k09": "-1715646888.01304", "k10": "\\N", "k11": "\\N",
"k12": "2022-12-02 17:56:44", "k13": "2022-10-12", "k14": "2023-02-19
07:02:54", "k15": "V", "k16": "", "k17":
"E9GzQdTwX1ITUQz27IVznAs6Ca4WwprKk6Odjs6SH75D2F1089QiY3HQ52LXRD1V6xAWjhLE2hWgW3EdHuAOnUDVrb5V",
"k18": "\\N"}
+{"k00": "62", "k01": "2023-08-21", "k02": "0", "k03": "81", "k04": "20302",
"k05": "-200761532", "k06": "6365479976421007608", "k07": "\\N", "k08":
"-29916.533", "k09": "1709141750.8284781", "k10": "\\N", "k11": "\\N", "k12":
"2023-05-04 01:14:51", "k13": "2022-09-17", "k14": "2022-12-04 19:30:09",
"k15": "d", "k16": "v", "k17":
"BKWy9dTNg1aZW7ancEJAmEDOPK5TwFsNSHbI78emu9gymeIlx5NoLmyii0QAqdzRvSQPZKiqKkwInGCTIBnK1yYkK7zD",
"k18": "\\N"}
+{"k00": "50", "k01": "2023-08-24", "k02": "1", "k03": "15", "k04": "14403",
"k05": "\\N", "k06": "-6418906115745394180", "k07": "9205303779366462513",
"k08": "-4331.5488", "k09": "-615112179.557648", "k10": "\\N", "k11": "\\N",
"k12": "2022-12-29 02:27:20", "k13": "2023-06-01", "k14": "2023-08-12
04:50:04", "k15": "a", "k16": "", "k17":
"eCl38sztIvBQvGvGKyYZmyMXy9vIJx197iu3JwP9doJGcrYUl9Uova0rz4iCCgrjlAiZU18Fs9YtCq830nhM",
"k18": "\\N"}
+{"k00": "50", "k01": "2023-08-06", "k02": "1", "k03": "109", "k04": "-6330",
"k05": "1479023892", "k06": "-8630800697573159428", "k07":
"-1645095773540208759", "k08": "17880.961", "k09": "-1453844792.0139489",
"k10": "\\N", "k11": "\\N", "k12": "2022-09-22 02:03:21", "k13": "2023-05-14",
"k14": "2023-03-25 02:18:34", "k15": "m", "k16": "", "k17":
"JKnIgXvGVidGiWl9YRSi3mFI7wHKt1sBpWSadKF8VX3LAuElm4sdc9gtxREaUr57oikSYlU8We8h1MWqQlYNiJObl",
"k18": "\\N"}
+{"k00": "68", "k01": "2023-08-23", "k02": "1", "k03": "-73", "k04": "20117",
"k05": "1737338128", "k06": "795638676048937749", "k07":
"-5551546237562433901", "k08": "-30627.039", "k09": "68589475.684545", "k10":
"\\N", "k11": "\\N", "k12": "2022-12-28 20:26:51", "k13": "2022-10-04", "k14":
"2023-07-30 00:20:06", "k15": "y", "k16": "", "k17":
"keZ3JlWWpdnPBejf0cuiCQCVBBTd5gjvO08NVdcAFewqL7nRT4N9lnvSU6pWmletA5VbPQCeQapJdcnQCHfZUDCf4ulCnczyqr7SGrbGRT0XYcd7iktKM",
"k18": "\\N"}
+{"k00": "90", "k01": "2023-08-27", "k02": "1", "k03": "22", "k04": "16456",
"k05": "-1476824962", "k06": "-3279894870153540825", "k07":
"8990195191470116763", "k08": "26651.906", "k09": "206860148.942546", "k10":
"\\N", "k11": "\\N", "k12": "2022-10-07 03:11:03", "k13": "2023-03-18", "k14":
"2023-04-15 00:38:33", "k15": "T", "k16": "L", "k17":
"QW0GQ3GoMtHgxPQOWGfVaveynahNpsNs09siMFA1OtO6QEDBQTdivmGyq7bFzejAqwbbVQQpREAmeLjcFSXLnQuou2KbwYD",
"k18": "\\N"}
+{"k00": "65", "k01": "2023-08-09", "k02": "0", "k03": "94", "k04": "31514",
"k05": "814994517", "k06": "-297697460695940343", "k07": "734910652450318597",
"k08": "-13061.892", "k09": "62750847.041706", "k10": "-9808654", "k11": "\\N",
"k12": "2023-08-14 22:01:27", "k13": "2023-05-19", "k14": "2022-11-13
13:44:28", "k15": "V", "k16": "", "k17":
"aGeMsI24O12chGlP5ak0AHghAz7bu5MargJBStHnt0yMnChH0JnfYhsfH1u59XIHkJKMsHYktBqORkGlovu8V47E74KeFpaqxn5yLyXfDbhhzUKf",
"k18": "\\N"}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
index c4180e35f4d..191ea4381fd 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
@@ -25,6 +25,10 @@ suite("test_routine_load_error","p0") {
"multi_table_load_invalid_table",
]
+ def kafkaJsonTopics = [
+ "invalid_json_path",
+ ]
+
String enabled = context.config.otherConfigs.get("enableKafkaTest")
String kafka_port = context.config.otherConfigs.get("kafka_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
@@ -48,6 +52,15 @@ suite("test_routine_load_error","p0") {
producer.send(record)
}
}
+ for (String kafkaJsonTopic in kafkaJsonTopics) {
+ def kafkaJson = new
File("""${context.file.parent}/data/${kafkaJsonTopic}.json""").text
+ def lines = kafkaJson.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaJsonTopic, null, line)
+ producer.send(record)
+ }
+ }
}
def i = 0
@@ -138,4 +151,101 @@ suite("test_routine_load_error","p0") {
sql "stop routine load for ${jobName}"
}
}
+
+ // test json path is invalid
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def tableName = "test_invalid_json_path"
+ def jobName = "invalid_json_path"
+ try {
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName}
+ (
+ k00 INT NOT NULL,
+ k01 array<BOOLEAN> NULL,
+ k02 array<TINYINT> NULL,
+ k03 array<SMALLINT> NULL,
+ k04 array<INT> NULL,
+ k05 array<BIGINT> NULL,
+ k06 array<LARGEINT> NULL,
+ k07 array<FLOAT> NULL,
+ k08 array<DOUBLE> NULL,
+ k09 array<DECIMAL> NULL,
+ k10 array<DECIMALV3> NULL,
+ k11 array<DATE> NULL,
+ k12 array<DATETIME> NULL,
+ k13 array<DATEV2> NULL,
+ k14 array<DATETIMEV2> NULL,
+ k15 array<CHAR> NULL,
+ k16 array<VARCHAR> NULL,
+ k17 array<STRING> NULL,
+ kd01 array<BOOLEAN> NOT NULL DEFAULT "[]",
+ kd02 array<TINYINT> NOT NULL DEFAULT "[]",
+ kd03 array<SMALLINT> NOT NULL DEFAULT "[]",
+ kd04 array<INT> NOT NULL DEFAULT "[]",
+ kd05 array<BIGINT> NOT NULL DEFAULT "[]",
+ kd06 array<LARGEINT> NOT NULL DEFAULT "[]",
+ kd07 array<FLOAT> NOT NULL DEFAULT "[]",
+ kd08 array<DOUBLE> NOT NULL DEFAULT "[]",
+ kd09 array<DECIMAL> NOT NULL DEFAULT "[]",
+ kd10 array<DECIMALV3> NOT NULL DEFAULT "[]",
+ kd11 array<DATE> NOT NULL DEFAULT "[]",
+ kd12 array<DATETIME> NOT NULL DEFAULT "[]",
+ kd13 array<DATEV2> NOT NULL DEFAULT "[]",
+ kd14 array<DATETIMEV2> NOT NULL DEFAULT "[]",
+ kd15 array<CHAR> NOT NULL DEFAULT "[]",
+ kd16 array<VARCHAR> NOT NULL DEFAULT "[]",
+ kd17 array<STRING> NOT NULL DEFAULT "[]"
+ )
+ UNIQUE KEY(k00)
+ DISTRIBUTED BY HASH(k00) BUCKETS 32
+ PROPERTIES (
+ "replication_num" = "1",
+ "enable_unique_key_merge_on_write" = "true"
+ );
+ """
+
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17)
+ PROPERTIES
+ (
+ "format" = "json",
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200",
+ "jsonpaths" = "[\'t\',\'a\']"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${jobName}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ def count = 0
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ def state = res[0][8].toString()
+ log.info("routine load state:
${res[0][8].toString()}".toString())
+ log.info("routine load statistic:
${res[0][14].toString()}".toString())
+ log.info("reason of state changed:
${res[0][17].toString()}".toString())
+ if (state != "PAUSED") {
+ count++
+ if (count > 60) {
+ assertEquals(1, 2)
+ }
+ continue;
+ }
+ log.info("reason of state changed:
${res[0][17].toString()}".toString())
+ assertTrue(res[0][17].toString().contains("Invalid json path"))
+ break;
+ }
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ }
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]