This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a4c0d8706b [fix](routine-load) pause job when json path is invalid 
#30197
6a4c0d8706b is described below

commit 6a4c0d8706b7b132a081029c0ab223fe7d1c8662
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Mon Jan 22 20:34:14 2024 +0800

    [fix](routine-load) pause job when json path is invalid #30197
    
    If jsonpaths is set wrong, routine load job will report error but running 
all time.For example:
    
    CREATE ROUTINE LOAD jobName ON tableName
    PROPERTIES
    (
        "format" = "json",
        "max_batch_interval" = "5",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "jsonpaths" = "[\'t\',\'a\']"
    )
    FROM KAFKA
    (
        "kafka_broker_list" = "$IP:PORT",
        "kafka_topic" = "XXX",
        "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    );
    Jsonpaths ['t','a'] is invalid, but job will running all time.
---
 be/src/common/status.h                             |   2 +
 be/src/vec/exec/format/json/new_json_reader.cpp    |   6 +-
 .../doris/load/routineload/RoutineLoadJob.java     |  10 ++
 .../apache/doris/transaction/TransactionState.java |   3 +-
 gensrc/thrift/Status.thrift                        |   1 +
 .../routine_load/data/invalid_json_path.json       |  20 ++++
 .../routine_load/test_routine_load_error.groovy    | 110 +++++++++++++++++++++
 7 files changed, 148 insertions(+), 4 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index 2bec1c397e8..a4ab9f60b0f 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -35,6 +35,7 @@ namespace ErrorCode {
     TStatusError(MEM_ALLOC_FAILED, true);                \
     TStatusError(BUFFER_ALLOCATION_FAILED, true);        \
     TStatusError(INVALID_ARGUMENT, false);               \
+    TStatusError(INVALID_JSON_PATH, false);              \
     TStatusError(MINIMUM_RESERVATION_UNAVAILABLE, true); \
     TStatusError(CORRUPTION, true);                      \
     TStatusError(IO_ERROR, true);                        \
@@ -405,6 +406,7 @@ public:
     ERROR_CTOR(MemoryAllocFailed, MEM_ALLOC_FAILED)
     ERROR_CTOR(BufferAllocFailed, BUFFER_ALLOCATION_FAILED)
     ERROR_CTOR(InvalidArgument, INVALID_ARGUMENT)
+    ERROR_CTOR(InvalidJsonPath, INVALID_JSON_PATH)
     ERROR_CTOR(MinimumReservationUnavailable, MINIMUM_RESERVATION_UNAVAILABLE)
     ERROR_CTOR(Corruption, CORRUPTION)
     ERROR_CTOR(IOError, IO_ERROR)
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 21b99ff9720..97affdcd0bb 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -414,12 +414,12 @@ Status NewJsonReader::_parse_jsonpath_and_json_root() {
         rapidjson::Document jsonpaths_doc;
         if (!jsonpaths_doc.Parse(_jsonpaths.c_str(), 
_jsonpaths.length()).HasParseError()) {
             if (!jsonpaths_doc.IsArray()) {
-                return Status::InvalidArgument("Invalid json path: {}", 
_jsonpaths);
+                return Status::InvalidJsonPath("Invalid json path: {}", 
_jsonpaths);
             }
             for (int i = 0; i < jsonpaths_doc.Size(); i++) {
                 const rapidjson::Value& path = jsonpaths_doc[i];
                 if (!path.IsString()) {
-                    return Status::InvalidArgument("Invalid json path: {}", 
_jsonpaths);
+                    return Status::InvalidJsonPath("Invalid json path: {}", 
_jsonpaths);
                 }
                 std::vector<JsonPath> parsed_paths;
                 JsonFunctions::parse_json_paths(path.GetString(), 
&parsed_paths);
@@ -427,7 +427,7 @@ Status NewJsonReader::_parse_jsonpath_and_json_root() {
             }
 
         } else {
-            return Status::InvalidArgument("Invalid json path: {}", 
_jsonpaths);
+            return Status::InvalidJsonPath("Invalid json path: {}", 
_jsonpaths);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 889d240ce29..9ce8bb2e5ac 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1145,6 +1145,16 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                     String msg;
                     if (txnStatusChangeReason != null) {
                         switch (txnStatusChangeReason) {
+                            case INVALID_JSON_PATH:
+                                msg = "be " + taskBeId + " abort task,"
+                                        + " task id: " + 
routineLoadTaskInfo.getId()
+                                        + " job id: " + 
routineLoadTaskInfo.getJobId()
+                                        + " with reason: " + 
txnStatusChangeReasonString
+                                        + " please check the jsonpaths";
+                                updateState(JobState.PAUSED,
+                                        new 
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+                                        false /* not replay */);
+                                return;
                             case OFFSET_OUT_OF_RANGE:
                                 msg = "be " + taskBeId + " abort task,"
                                         + " task id: " + 
routineLoadTaskInfo.getId()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 59e3d7c25b3..d1a59186862 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -108,7 +108,8 @@ public class TransactionState implements Writable {
         TIMEOUT,
         OFFSET_OUT_OF_RANGE,
         PAUSE,
-        NO_PARTITIONS;
+        NO_PARTITIONS,
+        INVALID_JSON_PATH;
 
         public static TxnStatusChangeReason fromString(String reasonString) {
             for (TxnStatusChangeReason txnStatusChangeReason : 
TxnStatusChangeReason.values()) {
diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift
index 039fd8abe1b..5805430eead 100644
--- a/gensrc/thrift/Status.thrift
+++ b/gensrc/thrift/Status.thrift
@@ -75,6 +75,7 @@ enum TStatusCode {
     INCOMPLETE          = 44,
     OLAP_ERR_VERSION_ALREADY_MERGED = 45,
     DATA_QUALITY_ERROR  = 46,
+    INVALID_JSON_PATH   = 47,
 
     VEC_EXCEPTION = 50,
     VEC_LOGIC_ERROR = 51,
diff --git 
a/regression-test/suites/load_p0/routine_load/data/invalid_json_path.json 
b/regression-test/suites/load_p0/routine_load/data/invalid_json_path.json
new file mode 100644
index 00000000000..6ef920fcad9
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/invalid_json_path.json
@@ -0,0 +1,20 @@
+{"k00": "21", "k01": "2023-08-18", "k02": "0", "k03": "63", "k04": "-27847", 
"k05": "-35409596", "k06": "8638201997392767650", "k07": "4919963231735304178", 
"k08": "-23382.541", "k09": "-1803403621.4263129", "k10": "-22009767", "k11": 
"\\N", "k12": "2023-03-31 10:56:14", "k13": "2023-01-20", "k14": "2023-02-18 
13:37:52", "k15": "N", "k16": "T", "k17": 
"PSiFwUEx3eVFNtjlnQ70YkgZNvKrGmQ2DN5K9yYHiSdFWeEDB1UpL3Frt8z1kEAIWRDWqXZuyi", 
"k18": "\\N"}
+{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", 
"k05": "-1362465190", "k06": "3990845741226497177", "k07": 
"2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": 
"\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": 
"2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": 
"PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", 
"k18": "\\N"}
+{"k00": "91", "k01": "2023-08-27", "k02": "1", "k03": "90", "k04": "2465", 
"k05": "702240964", "k06": "6373830997821598984", "k07": "305860046137409400", 
"k08": "15991.356", "k09": "1599972327.386147", "k10": "\\N", "k11": "\\N", 
"k12": "2023-04-26 19:31:10", "k13": "2023-07-21", "k14": "\\N", "k15": "2", 
"k16": "", "k17": 
"B7YKYBYT8w0YC926bZ8Yz1VzyiWw2NWDAiTlEoPVyz9AXGti2Npg1FxWqWk4hEaALw0ZBSuiAIPj41lq36g5QRpPmAjNPK",
 "k18": "\\N"}
+{"k00": "80", "k01": "2023-08-18", "k02": "0", "k03": "-18", "k04": "-8971", 
"k05": "679027874", "k06": "6535956962935330265", "k07": "3960889045799757165", 
"k08": "-13219.76", "k09": "1187161924.505394", "k10": "\\N", "k11": "\\N", 
"k12": "2023-03-11 07:40:00", "k13": "2022-11-29", "k14": "2023-01-14 
07:24:07", "k15": "N", "k16": "D", "k17": 
"3Nhx6xX1qdwaq7lxwLRSKMtJFbC03swWv12mpySSVysH3igGZTiGPuKMsYW7HAkf6CWc7c0nzqDsjuH3FYVMNCWRmfxMrmY8rykQCC4Ve",
 "k18": "\\N"}
+{"k00": "20", "k01": "2023-08-17", "k02": "0", "k03": "-5", "k04": "18158", 
"k05": "784479801", "k06": "1485484354598941738", "k07": 
"-6632681928222776815", "k08": "9708.4307", "k09": "-330432620.706069", "k10": 
"\\N", "k11": "\\N", "k12": "2022-09-15 21:40:55", "k13": "2023-02-23", "k14": 
"2023-08-13 21:31:54", "k15": "O", "k16": "X", "k17": 
"2pYmX2vAhfEEHZZYPsgAmda1G7otnwx5TmUC879FPhDeIjvWI79ksBZpfFG2gp7jhCSbpZiecKGklB5SvG8tm31i5SUqe1xrWgLt4HSq7lMJWp75tx2kxD7pRIOpn",
 "k18": "\\N"}
+{"k00": "66", "k01": "2023-08-15", "k02": "1", "k03": "-91", "k04": "28378", 
"k05": "609923317", "k06": "4872185586197131212", "k07": "1207709464099378591", 
"k08": "\\N", "k09": "-1863683325.9851229", "k10": "\\N", "k11": "\\N", "k12": 
"2022-09-24 10:39:23", "k13": "2022-09-24", "k14": "2022-10-16 18:36:43", 
"k15": "Y", "k16": "z", "k17": 
"AI1BSPQdKiHJiQH1kguyLSWsDXkC7zwy7PwgWnyGSaa9tBKRex8vHBdxg2QSKZKL2mV2lHz7iI1PnsTd4MXDcIKhqiHyPuQPt2tEtgt0UgF6",
 "k18": "\\N"}
+{"k00": "49", "k01": "2023-08-08", "k02": "0", "k03": "\\N", "k04": "16275", 
"k05": "-2144851675", "k06": "-2303421957908954634", "k07": 
"-46526938720058765", "k08": "-13141.143", "k09": "-686632233.2302", "k10": 
"\\N", "k11": "\\N", "k12": "2022-09-01 00:16:01", "k13": "2023-03-25", "k14": 
"2022-09-07 14:59:03", "k15": "s", "k16": "", "k17": "yvuILR2iNxfe8RRml", 
"k18": "\\N"}
+{"k00": "57", "k01": "2023-08-19", "k02": "1", "k03": "2", "k04": "-25462", 
"k05": "-74112029", "k06": "6458082754318544493", "k07": 
"-7910671781690629051", "k08": "-15205.859", "k09": "-306870797.484914", "k10": 
"\\N", "k11": "\\N", "k12": "2023-07-10 18:39:10", "k13": "2023-02-12", "k14": 
"2023-01-27 07:26:06", "k15": "y", "k16": "", "k17": 
"Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ", "k18": "\\N"}
+{"k00": "31", "k01": "2023-08-27", "k02": "0", "k03": "17", "k04": "-18849", 
"k05": "1728109133", "k06": "3266501886640700374", "k07": "527195452623418935", 
"k08": "-24062.328", "k09": "-1514348021.262435", "k10": "\\N", "k11": "\\N", 
"k12": "2022-10-07 03:24:23", "k13": "2022-09-25", "k14": "\\N", "k15": "0", 
"k16": "8", "k17": 
"yKMiAntORoRa8svnMfcxlOPwwND1m5s2fdS26Xu6cfs6HK5SAibqIp9h8sZcpjHy4", "k18": 
"\\N"}
+{"k00": "81", "k01": "2023-08-23", "k02": "0", "k03": "106", "k04": "11492", 
"k05": "-667795397", "k06": "4480250461471356146", "k07": 
"-5346660566234294101", "k08": "9082.75", "k09": "385167225.902608", "k10": 
"\\N", "k11": "\\N", "k12": "2023-03-20 03:33:16", "k13": "2022-11-24", "k14": 
"2023-02-16 18:29:41", "k15": "G", "k16": "9", "k17": 
"Lk3eNVQNjucbekD1rZmUlGPiXS5JvcWr2LQzRU8GSGIbSag", "k18": "\\N"}
+{"k00": "58", "k01": "2023-08-22", "k02": "\\N", "k03": "0", "k04": "-18231", 
"k05": "1832867360", "k06": "6997858407575297145", "k07": 
"2480714305422728023", "k08": "-5450.4888", "k09": "1475901032.138386", "k10": 
"\\N", "k11": "\\N", "k12": "2023-02-02 05:13:24", "k13": "2022-09-18", "k14": 
"2023-04-23 10:51:15", "k15": "k", "k16": "", "k17": 
"LdFXF7Kmfzgmnn2R6zLsXdmi3A2cLBLq4G4WDVNDhxvH7dYH8Kga2WA47uSIxp6NSrwPSdw0ssB1TS8RFJTDJAB0Uba3e05NL2Aiw0ja",
 "k18": "\\N"}
+{"k00": "85", "k01": "2023-08-11", "k02": "1", "k03": "-7", "k04": "24304", 
"k05": "-2043877415", "k06": "-2024144417867729183", "k07": "\\N", "k08": 
"5363.0244", "k09": "-578615669.042831", "k10": "\\N", "k11": "\\N", "k12": 
"2023-07-15 01:07:41", "k13": "2023-08-13", "k14": "2023-01-20 11:57:48", 
"k15": "i", "k16": "", "k17": "WQ9dh9ajPu0y", "k18": "\\N"}
+{"k00": "60", "k01": "2023-08-27", "k02": "0", "k03": "-52", "k04": "-2338", 
"k05": "-757056972", "k06": "1047567408607120856", "k07": 
"6541476642780646552", "k08": "6614.0894", "k09": "-1204448798.5178549", "k10": 
"\\N", "k11": "\\N", "k12": "2022-12-29 14:47:30", "k13": "2022-09-24", "k14": 
"2023-08-01 12:41:59", "k15": "O", "k16": "F", "k17": 
"RM4F1Ke7lkcnuxF2nK0j9VBW3MDcgyHR4pseBjtFnqS6GUkVFuzF6u3Cp9Nv7ab0O6UYrpP4DhU", 
"k18": "\\N"}
+{"k00": "41", "k01": "2023-08-27", "k02": "1", "k03": "-104", "k04": "22750", 
"k05": "\\N", "k06": "8527773271030840740", "k07": "5554497317268279215", 
"k08": "-5296.8281", "k09": "-1715646888.01304", "k10": "\\N", "k11": "\\N", 
"k12": "2022-12-02 17:56:44", "k13": "2022-10-12", "k14": "2023-02-19 
07:02:54", "k15": "V", "k16": "", "k17": 
"E9GzQdTwX1ITUQz27IVznAs6Ca4WwprKk6Odjs6SH75D2F1089QiY3HQ52LXRD1V6xAWjhLE2hWgW3EdHuAOnUDVrb5V",
 "k18": "\\N"}
+{"k00": "62", "k01": "2023-08-21", "k02": "0", "k03": "81", "k04": "20302", 
"k05": "-200761532", "k06": "6365479976421007608", "k07": "\\N", "k08": 
"-29916.533", "k09": "1709141750.8284781", "k10": "\\N", "k11": "\\N", "k12": 
"2023-05-04 01:14:51", "k13": "2022-09-17", "k14": "2022-12-04 19:30:09", 
"k15": "d", "k16": "v", "k17": 
"BKWy9dTNg1aZW7ancEJAmEDOPK5TwFsNSHbI78emu9gymeIlx5NoLmyii0QAqdzRvSQPZKiqKkwInGCTIBnK1yYkK7zD",
 "k18": "\\N"}
+{"k00": "50", "k01": "2023-08-24", "k02": "1", "k03": "15", "k04": "14403", 
"k05": "\\N", "k06": "-6418906115745394180", "k07": "9205303779366462513", 
"k08": "-4331.5488", "k09": "-615112179.557648", "k10": "\\N", "k11": "\\N", 
"k12": "2022-12-29 02:27:20", "k13": "2023-06-01", "k14": "2023-08-12 
04:50:04", "k15": "a", "k16": "", "k17": 
"eCl38sztIvBQvGvGKyYZmyMXy9vIJx197iu3JwP9doJGcrYUl9Uova0rz4iCCgrjlAiZU18Fs9YtCq830nhM",
 "k18": "\\N"}
+{"k00": "50", "k01": "2023-08-06", "k02": "1", "k03": "109", "k04": "-6330", 
"k05": "1479023892", "k06": "-8630800697573159428", "k07": 
"-1645095773540208759", "k08": "17880.961", "k09": "-1453844792.0139489", 
"k10": "\\N", "k11": "\\N", "k12": "2022-09-22 02:03:21", "k13": "2023-05-14", 
"k14": "2023-03-25 02:18:34", "k15": "m", "k16": "", "k17": 
"JKnIgXvGVidGiWl9YRSi3mFI7wHKt1sBpWSadKF8VX3LAuElm4sdc9gtxREaUr57oikSYlU8We8h1MWqQlYNiJObl",
 "k18": "\\N"}
+{"k00": "68", "k01": "2023-08-23", "k02": "1", "k03": "-73", "k04": "20117", 
"k05": "1737338128", "k06": "795638676048937749", "k07": 
"-5551546237562433901", "k08": "-30627.039", "k09": "68589475.684545", "k10": 
"\\N", "k11": "\\N", "k12": "2022-12-28 20:26:51", "k13": "2022-10-04", "k14": 
"2023-07-30 00:20:06", "k15": "y", "k16": "", "k17": 
"keZ3JlWWpdnPBejf0cuiCQCVBBTd5gjvO08NVdcAFewqL7nRT4N9lnvSU6pWmletA5VbPQCeQapJdcnQCHfZUDCf4ulCnczyqr7SGrbGRT0XYcd7iktKM",
 "k18": "\\N"}
+{"k00": "90", "k01": "2023-08-27", "k02": "1", "k03": "22", "k04": "16456", 
"k05": "-1476824962", "k06": "-3279894870153540825", "k07": 
"8990195191470116763", "k08": "26651.906", "k09": "206860148.942546", "k10": 
"\\N", "k11": "\\N", "k12": "2022-10-07 03:11:03", "k13": "2023-03-18", "k14": 
"2023-04-15 00:38:33", "k15": "T", "k16": "L", "k17": 
"QW0GQ3GoMtHgxPQOWGfVaveynahNpsNs09siMFA1OtO6QEDBQTdivmGyq7bFzejAqwbbVQQpREAmeLjcFSXLnQuou2KbwYD",
 "k18": "\\N"}
+{"k00": "65", "k01": "2023-08-09", "k02": "0", "k03": "94", "k04": "31514", 
"k05": "814994517", "k06": "-297697460695940343", "k07": "734910652450318597", 
"k08": "-13061.892", "k09": "62750847.041706", "k10": "-9808654", "k11": "\\N", 
"k12": "2023-08-14 22:01:27", "k13": "2023-05-19", "k14": "2022-11-13 
13:44:28", "k15": "V", "k16": "", "k17": 
"aGeMsI24O12chGlP5ak0AHghAz7bu5MargJBStHnt0yMnChH0JnfYhsfH1u59XIHkJKMsHYktBqORkGlovu8V47E74KeFpaqxn5yLyXfDbhhzUKf",
 "k18": "\\N"}
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
index c4180e35f4d..191ea4381fd 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
@@ -25,6 +25,10 @@ suite("test_routine_load_error","p0") {
                   "multi_table_load_invalid_table",
                 ]
 
+    def kafkaJsonTopics = [
+                  "invalid_json_path",
+                ]
+
     String enabled = context.config.otherConfigs.get("enableKafkaTest")
     String kafka_port = context.config.otherConfigs.get("kafka_port")
     String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
@@ -48,6 +52,15 @@ suite("test_routine_load_error","p0") {
                 producer.send(record)
             }
         }
+        for (String kafkaJsonTopic in kafkaJsonTopics) {
+            def kafkaJson = new 
File("""${context.file.parent}/data/${kafkaJsonTopic}.json""").text
+            def lines = kafkaJson.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaJsonTopic, null, line)
+                producer.send(record)
+            }
+        }
     }
 
     def i = 0
@@ -138,4 +151,101 @@ suite("test_routine_load_error","p0") {
             sql "stop routine load for ${jobName}"
         }
     }
+    
+    // test json path is invalid
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def tableName = "test_invalid_json_path"
+        def jobName = "invalid_json_path"
+        try {
+            sql """ DROP TABLE IF EXISTS ${tableName} """
+            sql """
+            CREATE TABLE IF NOT EXISTS ${tableName}
+            (
+                k00 INT                    NOT NULL,
+                k01 array<BOOLEAN>         NULL,
+                k02 array<TINYINT>         NULL,
+                k03 array<SMALLINT>        NULL,
+                k04 array<INT>             NULL,
+                k05 array<BIGINT>          NULL,
+                k06 array<LARGEINT>        NULL,
+                k07 array<FLOAT>           NULL,
+                k08 array<DOUBLE>          NULL,
+                k09 array<DECIMAL>         NULL,
+                k10 array<DECIMALV3>       NULL,
+                k11 array<DATE>            NULL,
+                k12 array<DATETIME>        NULL,
+                k13 array<DATEV2>          NULL,
+                k14 array<DATETIMEV2>      NULL,
+                k15 array<CHAR>            NULL,
+                k16 array<VARCHAR>         NULL,
+                k17 array<STRING>          NULL,
+                kd01 array<BOOLEAN>         NOT NULL DEFAULT "[]",
+                kd02 array<TINYINT>         NOT NULL DEFAULT "[]",
+                kd03 array<SMALLINT>        NOT NULL DEFAULT "[]",
+                kd04 array<INT>             NOT NULL DEFAULT "[]",
+                kd05 array<BIGINT>          NOT NULL DEFAULT "[]",
+                kd06 array<LARGEINT>        NOT NULL DEFAULT "[]",
+                kd07 array<FLOAT>           NOT NULL DEFAULT "[]",
+                kd08 array<DOUBLE>          NOT NULL DEFAULT "[]",
+                kd09 array<DECIMAL>         NOT NULL DEFAULT "[]",
+                kd10 array<DECIMALV3>       NOT NULL DEFAULT "[]",
+                kd11 array<DATE>            NOT NULL DEFAULT "[]",
+                kd12 array<DATETIME>        NOT NULL DEFAULT "[]",
+                kd13 array<DATEV2>          NOT NULL DEFAULT "[]",
+                kd14 array<DATETIMEV2>      NOT NULL DEFAULT "[]",
+                kd15 array<CHAR>            NOT NULL DEFAULT "[]",
+                kd16 array<VARCHAR>         NOT NULL DEFAULT "[]",
+                kd17 array<STRING>          NOT NULL DEFAULT "[]"
+            )
+            UNIQUE KEY(k00)
+            DISTRIBUTED BY HASH(k00) BUCKETS 32
+            PROPERTIES (
+                "replication_num" = "1",
+                "enable_unique_key_merge_on_write" = "true"
+            );
+            """
+
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17)
+                PROPERTIES
+                (
+                    "format" = "json",
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200",
+                    "jsonpaths" = "[\'t\',\'a\']"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${jobName}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            def count = 0
+            while (true) {
+                sleep(1000)
+                def res = sql "show routine load for ${jobName}"
+                def state = res[0][8].toString()
+                log.info("routine load state: 
${res[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${res[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${res[0][17].toString()}".toString())
+                if (state != "PAUSED") {
+                    count++
+                    if (count > 60) {
+                        assertEquals(1, 2)
+                    } 
+                    continue;
+                }
+                log.info("reason of state changed: 
${res[0][17].toString()}".toString())
+                assertTrue(res[0][17].toString().contains("Invalid json path"))
+                break;
+            }
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql """ DROP TABLE IF EXISTS ${tableName} """
+        }
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to