This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-0.15 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 42c93ff56bd4c097a178ddcd3757c8f224711704 Author: Mingyu Chen <[email protected]> AuthorDate: Thu Nov 11 15:41:13 2021 +0800 [RoutineLoad] And "runningTxns" fields in SHOW ROUTINE LOAD result (#6986) Add a new field `runningTxns` in the result of `SHOW ROUTINE LOAD`. eg: ``` Id: 11001 Name: test4 CreateTime: 2021-11-02 00:04:54 PauseTime: NULL EndTime: NULL DbName: default_cluster:db1 TableName: tbl1 State: RUNNING DataSourceType: KAFKA CurrentTaskNum: 1 JobProperties: {xxx} CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"test4"} Statistic: {"receivedBytes":6,"runningTxns":[1001, 1002],"errorRows":0,"committedTaskNum":1,"loadedRows":2,"loadRowsRate":0,"abortedTaskNum":13,"errorRowsAfterResumed":0,"totalRows":2,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":20965} Progress: {"0":"10"} ReasonOfStateChanged: ErrorLogUrls: OtherMsg: ``` So that user can view the status of corresponding transactions of this job by executing `show transaction where id=xx`; --- .../java/org/apache/doris/load/routineload/RoutineLoadJob.java | 2 ++ .../org/apache/doris/load/routineload/RoutineLoadStatistic.java | 7 +++++++ .../org/apache/doris/load/routineload/RoutineLoadTaskInfo.java | 1 + 3 files changed, 10 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index b566029..a45b581 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -912,6 +912,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl writeLock(); try { + this.jobStatistic.runningTxnIds.remove(txnState.getTransactionId()); if (state != JobState.RUNNING) { // job is not running, nothing need to be done return; @@ -963,6 +964,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl throws UserException { long taskBeId = -1L; try { + this.jobStatistic.runningTxnIds.remove(txnState.getTransactionId()); if (txnOperated) { // step0: find task in job Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter( diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java index c0b3b06..7a5ad3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java @@ -22,12 +22,14 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Map; +import java.util.Set; public class RoutineLoadStatistic implements Writable { /* @@ -62,6 +64,10 @@ public class RoutineLoadStatistic implements Writable { @SerializedName(value = "abortedTaskNum") public long abortedTaskNum = 0; + // Save all transactions current running. Including PREPARE, COMMITTED. + // No need to persist, only for tracing txn of routine load job. + public Set<Long> runningTxnIds = Sets.newHashSet(); + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); @@ -87,6 +93,7 @@ public class RoutineLoadStatistic implements Writable { / this.totalTaskExcutionTimeMs * 1000)); summary.put("committedTaskNum", Long.valueOf(this.committedTaskNum)); summary.put("abortedTaskNum", Long.valueOf(this.abortedTaskNum)); + summary.put("runningTxns", runningTxnIds); return summary; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index b535b94..50acd8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -189,6 +189,7 @@ public abstract class RoutineLoadTaskInfo { DebugUtil.printId(id), jobId, e); throw e; } + routineLoadJob.jobStatistic.runningTxnIds.add(txnId); return true; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
