This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a02be87c09d [fix](broker) fix no error url when broker data quality
error (#35643)
a02be87c09d is described below
commit a02be87c09d5673712d4163ddd564a446f39786e
Author: xueweizhang <[email protected]>
AuthorDate: Tue Jun 4 23:19:51 2024 +0800
[fix](broker) fix no error url when broker data quality error (#35643)
---
.../main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java | 3 ++-
.../apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java | 9 ++++++++-
.../main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java | 6 ++++--
.../suites/load_p0/broker_load/test_etl_failed.groovy | 2 +-
4 files changed, 15 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 275cd371cdd..2ffe85bb36a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -51,6 +51,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TransactionState;
@@ -320,7 +321,7 @@ public class BrokerLoadJob extends BulkLoadJob {
}
// check data quality
- if (!checkDataQuality()) {
+ if (!checkDataQuality() || attachment.getStatus().getErrorCode() ==
TStatusCode.DATA_QUALITY_ERROR) {
cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED,
DataQualityException.QUALITY_FAIL_MSG), true,
true);
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
index 7aefd332a29..e5f8973d33a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
@@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2;
+import org.apache.doris.common.Status;
import org.apache.doris.transaction.ErrorTabletInfo;
import org.apache.doris.transaction.TabletCommitInfo;
@@ -29,15 +30,17 @@ public class BrokerLoadingTaskAttachment extends
TaskAttachment {
private String trackingUrl;
private List<TabletCommitInfo> commitInfoList;
List<ErrorTabletInfo> errorTabletInfos;
+ private Status status = new Status();
public BrokerLoadingTaskAttachment(long taskId, Map<String, String>
counters, String trackingUrl,
List<TabletCommitInfo> commitInfoList,
- List<ErrorTabletInfo> errorTabletInfos)
{
+ List<ErrorTabletInfo> errorTabletInfos,
Status status) {
super(taskId);
this.trackingUrl = trackingUrl;
this.counters = counters;
this.commitInfoList = commitInfoList;
this.errorTabletInfos = errorTabletInfos;
+ this.status = status;
}
public String getCounter(String key) {
@@ -55,4 +58,8 @@ public class BrokerLoadingTaskAttachment extends
TaskAttachment {
public List<ErrorTabletInfo> getErrorTabletInfos() {
return errorTabletInfos;
}
+
+ public Status getStatus() {
+ return status;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 71823f84d32..64b8e9a037f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -37,6 +37,7 @@ import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TQueryType;
+import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.ErrorTabletInfo;
import org.apache.doris.transaction.TabletCommitInfo;
@@ -198,13 +199,14 @@ public class LoadLoadingTask extends LoadTask {
curCoordinator.exec();
if (curCoordinator.join(waitSecond)) {
Status status = curCoordinator.getExecStatus();
- if (status.ok()) {
+ if (status.ok() || status.getErrorCode() ==
TStatusCode.DATA_QUALITY_ERROR) {
attachment = new BrokerLoadingTaskAttachment(signature,
curCoordinator.getLoadCounters(),
curCoordinator.getTrackingUrl(),
TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos()),
ErrorTabletInfo.fromThrift(curCoordinator.getErrorTabletInfos()
-
.stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList())));
+
.stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList())),
+ status);
curCoordinator.getErrorTabletInfos().clear();
} else {
throw new LoadException(status.getErrorMsg());
diff --git a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy
b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy
index 3b4eac874d5..928b4e38542 100644
--- a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy
@@ -63,7 +63,7 @@ suite("test_etl_failed", "load_p0") {
assertTrue(1 == 2, "etl should be failed")
break;
}
- if (result[0][2].equals("CANCELLED")) {
+ if (result[0][2].equals("CANCELLED") &&
result[0][13].contains("_load_error_log")) {
break;
}
Thread.sleep(1000)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]