This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 7ff2704fba6 [fix](routineload) pause job when json path is invalid
(#30282)
7ff2704fba6 is described below
commit 7ff2704fba6d069eaa8b55a76feb97e973d3587f
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Tue Jan 23 23:42:59 2024 +0800
[fix](routineload) pause job when json path is invalid (#30282)
---
be/src/common/status.h | 2 ++
be/src/vec/exec/format/json/new_json_reader.cpp | 6 +++---
.../java/org/apache/doris/load/routineload/RoutineLoadJob.java | 10 ++++++++++
.../java/org/apache/doris/transaction/TransactionState.java | 3 ++-
gensrc/thrift/Status.thrift | 1 +
5 files changed, 18 insertions(+), 4 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index f461f8f15e0..15ba73cd646 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -38,6 +38,7 @@ TStatusError(MEM_ALLOC_FAILED);
TStatusError(BUFFER_ALLOCATION_FAILED);
TStatusError(INVALID_ARGUMENT);
TStatusError(INVALID_DATA_FORMAT);
+TStatusError(INVALID_JSON_PATH);
TStatusError(MINIMUM_RESERVATION_UNAVAILABLE);
TStatusError(CORRUPTION);
TStatusError(IO_ERROR);
@@ -409,6 +410,7 @@ public:
ERROR_CTOR(BufferAllocFailed, BUFFER_ALLOCATION_FAILED)
ERROR_CTOR(InvalidArgument, INVALID_ARGUMENT)
ERROR_CTOR(InvalidDataFormat, INVALID_DATA_FORMAT)
+ ERROR_CTOR(InvalidJsonPath, INVALID_JSON_PATH)
ERROR_CTOR(MinimumReservationUnavailable, MINIMUM_RESERVATION_UNAVAILABLE)
ERROR_CTOR(Corruption, CORRUPTION)
ERROR_CTOR(IOError, IO_ERROR)
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index e99b66b3297..f9f434dc229 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -426,12 +426,12 @@ Status NewJsonReader::_parse_jsonpath_and_json_root() {
rapidjson::Document jsonpaths_doc;
if (!jsonpaths_doc.Parse(_jsonpaths.c_str(),
_jsonpaths.length()).HasParseError()) {
if (!jsonpaths_doc.IsArray()) {
- return Status::InvalidArgument("Invalid json path: {}",
_jsonpaths);
+ return Status::InvalidJsonPath("Invalid json path: {}",
_jsonpaths);
} else {
for (int i = 0; i < jsonpaths_doc.Size(); i++) {
const rapidjson::Value& path = jsonpaths_doc[i];
if (!path.IsString()) {
- return Status::InvalidArgument("Invalid json path:
{}", _jsonpaths);
+ return Status::InvalidJsonPath("Invalid json path:
{}", _jsonpaths);
}
std::vector<JsonPath> parsed_paths;
JsonFunctions::parse_json_paths(path.GetString(),
&parsed_paths);
@@ -439,7 +439,7 @@ Status NewJsonReader::_parse_jsonpath_and_json_root() {
}
}
} else {
- return Status::InvalidArgument("Invalid json path: {}",
_jsonpaths);
+ return Status::InvalidJsonPath("Invalid json path: {}",
_jsonpaths);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 62e82d95457..0da102e2212 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1127,6 +1127,16 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
String msg;
if (txnStatusChangeReason != null) {
switch (txnStatusChangeReason) {
+ case INVALID_JSON_PATH:
+ msg = "be " + taskBeId + " abort task,"
+ + " task id: " +
routineLoadTaskInfo.getId()
+ + " job id: " +
routineLoadTaskInfo.getJobId()
+ + " with reason: " +
txnStatusChangeReasonString
+ + " please check the jsonpaths";
+ updateState(JobState.PAUSED,
+ new
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+ false /* not replay */);
+ return;
case OFFSET_OUT_OF_RANGE:
msg = "be " + taskBeId + " abort task,"
+ " task id: " +
routineLoadTaskInfo.getId()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 9e2054f553e..5eed8c655c9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -108,7 +108,8 @@ public class TransactionState implements Writable {
TIMEOUT,
OFFSET_OUT_OF_RANGE,
PAUSE,
- NO_PARTITIONS;
+ NO_PARTITIONS,
+ INVALID_JSON_PATH;
public static TxnStatusChangeReason fromString(String reasonString) {
for (TxnStatusChangeReason txnStatusChangeReason :
TxnStatusChangeReason.values()) {
diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift
index 044c708b6eb..2cc9adb344a 100644
--- a/gensrc/thrift/Status.thrift
+++ b/gensrc/thrift/Status.thrift
@@ -76,6 +76,7 @@ enum TStatusCode {
OLAP_ERR_VERSION_ALREADY_MERGED = 45,
DATA_QUALITY_ERROR = 46,
INVALID_DATA_FORMAT = 47,
+ INVALID_JSON_PATH = 48,
VEC_EXCEPTION = 50,
VEC_LOGIC_ERROR = 51,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]