This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 7ff2704fba6 [fix](routineload) pause job when json path is invalid 
(#30282)
7ff2704fba6 is described below

commit 7ff2704fba6d069eaa8b55a76feb97e973d3587f
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Tue Jan 23 23:42:59 2024 +0800

    [fix](routineload) pause job when json path is invalid (#30282)
---
 be/src/common/status.h                                         |  2 ++
 be/src/vec/exec/format/json/new_json_reader.cpp                |  6 +++---
 .../java/org/apache/doris/load/routineload/RoutineLoadJob.java | 10 ++++++++++
 .../java/org/apache/doris/transaction/TransactionState.java    |  3 ++-
 gensrc/thrift/Status.thrift                                    |  1 +
 5 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index f461f8f15e0..15ba73cd646 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -38,6 +38,7 @@ TStatusError(MEM_ALLOC_FAILED);
 TStatusError(BUFFER_ALLOCATION_FAILED);
 TStatusError(INVALID_ARGUMENT);
 TStatusError(INVALID_DATA_FORMAT);
+TStatusError(INVALID_JSON_PATH);
 TStatusError(MINIMUM_RESERVATION_UNAVAILABLE);
 TStatusError(CORRUPTION);
 TStatusError(IO_ERROR);
@@ -409,6 +410,7 @@ public:
     ERROR_CTOR(BufferAllocFailed, BUFFER_ALLOCATION_FAILED)
     ERROR_CTOR(InvalidArgument, INVALID_ARGUMENT)
     ERROR_CTOR(InvalidDataFormat, INVALID_DATA_FORMAT)
+    ERROR_CTOR(InvalidJsonPath, INVALID_JSON_PATH)
     ERROR_CTOR(MinimumReservationUnavailable, MINIMUM_RESERVATION_UNAVAILABLE)
     ERROR_CTOR(Corruption, CORRUPTION)
     ERROR_CTOR(IOError, IO_ERROR)
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index e99b66b3297..f9f434dc229 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -426,12 +426,12 @@ Status NewJsonReader::_parse_jsonpath_and_json_root() {
         rapidjson::Document jsonpaths_doc;
         if (!jsonpaths_doc.Parse(_jsonpaths.c_str(), 
_jsonpaths.length()).HasParseError()) {
             if (!jsonpaths_doc.IsArray()) {
-                return Status::InvalidArgument("Invalid json path: {}", 
_jsonpaths);
+                return Status::InvalidJsonPath("Invalid json path: {}", 
_jsonpaths);
             } else {
                 for (int i = 0; i < jsonpaths_doc.Size(); i++) {
                     const rapidjson::Value& path = jsonpaths_doc[i];
                     if (!path.IsString()) {
-                        return Status::InvalidArgument("Invalid json path: 
{}", _jsonpaths);
+                        return Status::InvalidJsonPath("Invalid json path: 
{}", _jsonpaths);
                     }
                     std::vector<JsonPath> parsed_paths;
                     JsonFunctions::parse_json_paths(path.GetString(), 
&parsed_paths);
@@ -439,7 +439,7 @@ Status NewJsonReader::_parse_jsonpath_and_json_root() {
                 }
             }
         } else {
-            return Status::InvalidArgument("Invalid json path: {}", 
_jsonpaths);
+            return Status::InvalidJsonPath("Invalid json path: {}", 
_jsonpaths);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 62e82d95457..0da102e2212 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1127,6 +1127,16 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                     String msg;
                     if (txnStatusChangeReason != null) {
                         switch (txnStatusChangeReason) {
+                            case INVALID_JSON_PATH:
+                                msg = "be " + taskBeId + " abort task,"
+                                        + " task id: " + 
routineLoadTaskInfo.getId()
+                                        + " job id: " + 
routineLoadTaskInfo.getJobId()
+                                        + " with reason: " + 
txnStatusChangeReasonString
+                                        + " please check the jsonpaths";
+                                updateState(JobState.PAUSED,
+                                        new 
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+                                        false /* not replay */);
+                                return;
                             case OFFSET_OUT_OF_RANGE:
                                 msg = "be " + taskBeId + " abort task,"
                                         + " task id: " + 
routineLoadTaskInfo.getId()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 9e2054f553e..5eed8c655c9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -108,7 +108,8 @@ public class TransactionState implements Writable {
         TIMEOUT,
         OFFSET_OUT_OF_RANGE,
         PAUSE,
-        NO_PARTITIONS;
+        NO_PARTITIONS,
+        INVALID_JSON_PATH;
 
         public static TxnStatusChangeReason fromString(String reasonString) {
             for (TxnStatusChangeReason txnStatusChangeReason : 
TxnStatusChangeReason.values()) {
diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift
index 044c708b6eb..2cc9adb344a 100644
--- a/gensrc/thrift/Status.thrift
+++ b/gensrc/thrift/Status.thrift
@@ -76,6 +76,7 @@ enum TStatusCode {
     OLAP_ERR_VERSION_ALREADY_MERGED = 45,
     DATA_QUALITY_ERROR  = 46,
     INVALID_DATA_FORMAT = 47,
+    INVALID_JSON_PATH   = 48,
 
     VEC_EXCEPTION = 50,
     VEC_LOGIC_ERROR = 51,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to