This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new d61c7deea44 branch-4.0: [fix](streaming job) fix streaming job
statistic never update #56667 (#56766)
d61c7deea44 is described below
commit d61c7deea44b3a59114eb8bed7b3ac318d7cc712
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Oct 11 15:57:11 2025 +0800
branch-4.0: [fix](streaming job) fix streaming job statistic never update
#56667 (#56766)
Cherry-picked from #56667
Co-authored-by: hui lai <[email protected]>
---
.../transaction/CloudGlobalTransactionMgr.java | 11 +++++--
.../apache/doris/cloud/transaction/TxnUtil.java | 6 ++++
.../insert/streaming/StreamingInsertJob.java | 11 ++++---
.../insert/streaming/StreamingInsertTask.java | 2 +-
.../StreamingTaskTxnCommitAttachment.java | 6 ++++
.../job/offset/s3/S3SourceOffsetProvider.java | 6 ++--
.../apache/doris/load/loadv2/InsertLoadJob.java | 4 +++
.../commands/insert/AbstractInsertExecutor.java | 20 +++++++++++-
.../commands/insert/InsertIntoTableCommand.java | 8 +++--
.../insert/OlapGroupCommitInsertExecutor.java | 4 +--
.../plans/commands/insert/OlapInsertExecutor.java | 36 +++++++++++++++++-----
.../commands/insert/OlapTxnInsertExecutor.java | 4 +--
.../streaming_job/test_streaming_insert_job.groovy | 20 ++++++------
13 files changed, 105 insertions(+), 33 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index c3b9c321fa9..fa3617a96a5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -623,16 +623,18 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
} else if (txnCommitAttachment instanceof
StreamingTaskTxnCommitAttachment) {
StreamingTaskTxnCommitAttachment
streamingTaskTxnCommitAttachment =
(StreamingTaskTxnCommitAttachment)
txnCommitAttachment;
- TxnStateChangeCallback cb =
callbackFactory.getCallback(streamingTaskTxnCommitAttachment.getTaskId());
+ TxnStateChangeCallback cb =
callbackFactory.getCallback(streamingTaskTxnCommitAttachment.getJobId());
+ TxnCommitAttachment commitAttachment = null;
if (cb != null) {
// use a temporary transaction state to do before commit
check,
// what actually works is the transactionId
TransactionState tmpTxnState = new TransactionState();
tmpTxnState.setTransactionId(transactionId);
cb.beforeCommitted(tmpTxnState);
+ commitAttachment = tmpTxnState.getTxnCommitAttachment();
}
builder.setCommitAttachment(TxnUtil
-
.streamingTaskTxnCommitAttachmentToPb(streamingTaskTxnCommitAttachment));
+
.streamingTaskTxnCommitAttachmentToPb((StreamingTaskTxnCommitAttachment)
commitAttachment));
} else {
throw new UserException("invalid txnCommitAttachment");
}
@@ -676,6 +678,11 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
if (txnCommitAttachment != null && txnCommitAttachment instanceof
RLTaskTxnCommitAttachment) {
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment =
(RLTaskTxnCommitAttachment) txnCommitAttachment;
callbackId = rlTaskTxnCommitAttachment.getJobId();
+ } else if (txnCommitAttachment != null
+ && txnCommitAttachment instanceof
StreamingTaskTxnCommitAttachment) {
+ StreamingTaskTxnCommitAttachment
streamingTaskTxnCommitAttachment =
+ (StreamingTaskTxnCommitAttachment) txnCommitAttachment;
+ callbackId = streamingTaskTxnCommitAttachment.getJobId();
} else if (txnState != null) {
callbackId = txnState.getCallbackId();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
index 06cd25d1357..b947ef0649e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
@@ -373,6 +373,12 @@ public class TxnUtil {
commitAttachment =
TxnUtil.rtTaskTxnCommitAttachmentFromPb(txnInfo.getCommitAttachment());
}
+
+ if (txnInfo.getCommitAttachment().getType()
+ ==
TxnCommitAttachmentPB.Type.STREAMING_TASK_TXN_COMMIT_ATTACHMENT) {
+ commitAttachment =
+
TxnUtil.streamingTaskTxnCommitAttachmentFromPb(txnInfo.getCommitAttachment());
+ }
}
long prepareTime = txnInfo.hasPrepareTime() ? txnInfo.getPrepareTime()
: -1;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index f70cbc37fca..0a9b27e7632 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -41,6 +41,7 @@ import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.extensions.insert.InsertTask;
import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.job.offset.SourceOffsetProviderFactory;
+import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
@@ -295,8 +296,6 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
StreamingInsertTask nextTask = createStreamingInsertTask();
this.runningStreamTask = nextTask;
- //todo: maybe fetch from txn attachment?
- offsetProvider.updateOffset(task.getRunningOffset());
} finally {
writeUnlock();
}
@@ -417,8 +416,12 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
taskIds.add(runningStreamTask.getTaskId());
// todo: Check whether the taskid of runningtask is consistent
with the taskid associated with txn
- // todo: need get loadStatistic, load manager statistic is empty
- LoadStatistic loadStatistic = new LoadStatistic();
+ List<LoadJob> loadJobs =
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
+ if (loadJobs.size() == 0) {
+ throw new TransactionException("load job not found, insert job
id is " + runningStreamTask.getTaskId());
+ }
+ LoadJob loadJob = loadJobs.get(0);
+ LoadStatistic loadStatistic = loadJob.getLoadStatistic();
txnState.setTxnCommitAttachment(new
StreamingTaskTxnCommitAttachment(
getJobId(),
runningStreamTask.getTaskId(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index d1a72b8bd24..433e372abf5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -127,6 +127,7 @@ public class StreamingInsertTask {
this.runningOffset = offsetProvider.getNextOffset(jobProperties,
originTvfProps);
InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new
NereidsParser().parseSingle(sql);
+ baseCommand.setJobId(getTaskId());
StmtExecutor baseStmtExecutor =
new StmtExecutor(ctx, new LogicalPlanAdapter(baseCommand,
ctx.getStatementContext()));
baseCommand.initPlan(ctx, baseStmtExecutor, false);
@@ -135,7 +136,6 @@ public class StreamingInsertTask {
}
this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand,
runningOffset);
this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER
+ getTaskId()));
- this.taskCommand.setJobId(getTaskId());
this.stmtExecutor = new StmtExecutor(ctx, new
LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
index b3f6a2c0b1b..01fdb3e16e4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
@@ -23,9 +23,14 @@ import org.apache.doris.transaction.TxnCommitAttachment;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
+import lombok.Setter;
public class StreamingTaskTxnCommitAttachment extends TxnCommitAttachment {
+ public StreamingTaskTxnCommitAttachment() {
+ super(TransactionState.LoadJobSourceType.STREAMING_JOB);
+ }
+
public StreamingTaskTxnCommitAttachment(long jobId, long taskId,
long scannedRows, long loadBytes, long numFiles, long
fileBytes, String offset) {
super(TransactionState.LoadJobSourceType.STREAMING_JOB);
@@ -48,6 +53,7 @@ public class StreamingTaskTxnCommitAttachment extends
TxnCommitAttachment {
}
@Getter
+ @Setter
private long jobId;
@Getter
private long taskId;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index 74743d09399..01986b40c50 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -139,8 +139,10 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
}
return plan;
});
- return new InsertIntoTableCommand((LogicalPlan) rewritePlan,
Optional.empty(), Optional.empty(),
- Optional.empty(), true, Optional.empty());
+ InsertIntoTableCommand insertIntoTableCommand = new
InsertIntoTableCommand((LogicalPlan) rewritePlan,
+ Optional.empty(), Optional.empty(), Optional.empty(), true,
Optional.empty());
+ insertIntoTableCommand.setJobId(originCommand.getJobId());
+ return insertIntoTableCommand;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
index e52a87fe926..a8eec265b5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
@@ -57,6 +57,10 @@ public class InsertLoadJob extends LoadJob {
super(EtlJobType.INSERT, dbId, label);
}
+ public InsertLoadJob(long dbId, String label, long jobId) {
+ super(EtlJobType.INSERT, dbId, label, jobId);
+ }
+
public InsertLoadJob(String label, long transactionId, long dbId, long
tableId,
long createTimestamp, String failMsg, String trackingUrl, String
firstErrorMsg,
UserIdentity userInfo) throws MetaNotFoundException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index bc032a9d61f..9b6226b417a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -71,7 +71,7 @@ public abstract class AbstractInsertExecutor {
protected long txnId = INVALID_TXN_ID;
/**
- * Constructor
+ * Randomly generate job ID.
*/
public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String
labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
@@ -87,6 +87,24 @@ public abstract class AbstractInsertExecutor {
this.emptyInsert = emptyInsert;
}
+ /**
+ * Specify job ID.
+ */
+ public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String
labelName, NereidsPlanner planner,
+ Optional<InsertCommandContext> insertCtx, boolean emptyInsert,
long jobId) {
+ this.ctx = ctx;
+ this.database = table.getDatabase();
+ this.insertLoadJob = new InsertLoadJob(database.getId(), labelName,
jobId);
+ ctx.getEnv().getLoadManager().addLoadJob(insertLoadJob);
+ this.coordinator = EnvFactory.getInstance().createCoordinator(
+ ctx, planner, ctx.getStatsErrorEstimator(),
insertLoadJob.getId());
+ this.labelName = labelName;
+ this.table = table;
+ this.insertCtx = insertCtx;
+ this.emptyInsert = emptyInsert;
+ this.jobId = jobId;
+ }
+
public Coordinator getCoordinator() {
return coordinator;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index a0a43814486..6e6e499b089 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -132,6 +132,7 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
this.cte = cte;
this.needNormalizePlan = needNormalizePlan;
this.branchName = branchName;
+ this.jobId = Env.getCurrentEnv().getNextId();
}
/**
@@ -356,7 +357,8 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
planner,
dataSink,
physicalSink,
- () -> new OlapTxnInsertExecutor(ctx, olapTable,
label, planner, insertCtx, emptyInsert)
+ () -> new OlapTxnInsertExecutor(
+ ctx, olapTable, label, planner, insertCtx,
emptyInsert, jobId)
);
} else if (ctx.isGroupCommit()) {
Backend groupCommitBackend = Env.getCurrentEnv()
@@ -369,7 +371,7 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
dataSink,
physicalSink,
() -> new OlapGroupCommitInsertExecutor(
- ctx, olapTable, label, planner, insertCtx,
emptyInsert, groupCommitBackend
+ ctx, olapTable, label, planner, insertCtx,
emptyInsert, groupCommitBackend, jobId
)
);
} else {
@@ -377,7 +379,7 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
planner,
dataSink,
physicalSink,
- () -> new OlapInsertExecutor(ctx, olapTable,
label, planner, insertCtx, emptyInsert)
+ () -> new OlapInsertExecutor(ctx, olapTable,
label, planner, insertCtx, emptyInsert, jobId)
);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
index a2c7e280da6..803b009d136 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
@@ -65,8 +65,8 @@ public class OlapGroupCommitInsertExecutor extends
OlapInsertExecutor {
public OlapGroupCommitInsertExecutor(ConnectContext ctx, Table table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx,
- boolean emptyInsert, Backend backend) {
- super(ctx, table, labelName, planner, insertCtx, emptyInsert);
+ boolean emptyInsert, Backend backend, long jobId) {
+ super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
this.groupCommitBackend = backend;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 357523a7b4b..e7d19f07577 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
+import
org.apache.doris.job.extensions.insert.streaming.StreamingTaskTxnCommitAttachment;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
@@ -59,6 +60,7 @@ import
org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;
+import org.apache.doris.transaction.TxnCommitAttachment;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
@@ -84,8 +86,9 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
* constructor
*/
public OlapInsertExecutor(ConnectContext ctx, Table table,
- String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
- super(ctx, table, labelName, planner, insertCtx, emptyInsert);
+ String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx, boolean emptyInsert,
+ long jobId) {
+ super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
this.olapTable = (OlapTable) table;
}
@@ -190,6 +193,7 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
@Override
protected void onComplete() throws UserException {
+ TxnCommitAttachment txnCommitAttachment = buildTxnAttachment();
setTxnCallbackId();
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
try {
@@ -204,7 +208,7 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
database, Lists.newArrayList((Table) table),
txnId,
TabletCommitInfo.fromThrift(coordinator.getCommitInfos()),
- ctx.getSessionVariable().getInsertVisibleTimeoutMs())) {
+ ctx.getSessionVariable().getInsertVisibleTimeoutMs(),
txnCommitAttachment)) {
txnStatus = TransactionStatus.VISIBLE;
} else {
txnStatus = TransactionStatus.COMMITTED;
@@ -231,13 +235,31 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
if (state == null) {
throw new AnalysisException("txn does not exist: " + txnId);
}
- StreamingInsertTask streamingInsertTask =
-
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().getStreamingInsertTaskById(jobId);
- if (streamingInsertTask != null) {
- state.setCallbackId(streamingInsertTask.getJobId());
+ StreamingInsertTask task = Env.getCurrentEnv()
+ .getJobManager()
+ .getStreamingTaskManager()
+ .getStreamingInsertTaskById(jobId);
+ if (task != null) {
+ state.setCallbackId(task.getJobId());
}
}
+ private TxnCommitAttachment buildTxnAttachment() {
+ if (!Config.isCloudMode()) {
+ return null;
+ }
+ StreamingInsertTask task = Env.getCurrentEnv()
+ .getJobManager()
+ .getStreamingTaskManager()
+ .getStreamingInsertTaskById(jobId);
+ if (task == null) {
+ return null;
+ }
+ StreamingTaskTxnCommitAttachment attachment = new
StreamingTaskTxnCommitAttachment();
+ attachment.setJobId(task.getJobId());
+ return attachment;
+ }
+
@Override
protected void onFail(Throwable t) {
errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java
index 1512eca16e2..aa095d3f91a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java
@@ -42,8 +42,8 @@ public class OlapTxnInsertExecutor extends OlapInsertExecutor
{
public OlapTxnInsertExecutor(ConnectContext ctx, Table table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx,
- boolean emptyInsert) {
- super(ctx, table, labelName, planner, insertCtx, emptyInsert);
+ boolean emptyInsert, long jobId) {
+ super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
txnStatus = TransactionStatus.PREPARE;
}
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 6865cedd842..5695fa31c92 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -65,8 +65,8 @@ suite("test_streaming_insert_job") {
Awaitility.await().atMost(300, SECONDS)
.pollInterval(1, SECONDS).until(
{
- print("check success task count")
def jobSuccendCount = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name like '%${jobName}%' and
ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
// check job status and succeed task count larger than 2
jobSuccendCount.size() == 1 && '2' <=
jobSuccendCount.get(0).get(0)
}
@@ -74,13 +74,13 @@ suite("test_streaming_insert_job") {
} catch (Exception ex){
def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
- println("show job: " + showjob)
- println("show task: " + showtask)
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
throw ex;
}
def jobResult = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
- println("show success job: " + jobResult)
+ log.info("show success job: " + jobResult)
qt_select """ SELECT * FROM ${tableName} order by c1 """
@@ -96,12 +96,14 @@ suite("test_streaming_insert_job") {
assert pauseShowTask.size() == 0
- def jobOffset = sql """
- select currentOffset, endoffset from jobs("type"="insert") where
Name='${jobName}'
+ def jobInfo = sql """
+ select currentOffset, endoffset, loadStatistic from
jobs("type"="insert") where Name='${jobName}'
"""
- assert jobOffset.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobOffset.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
-
+ log.info("jobInfo: " + jobInfo)
+ assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
+
// alter streaming job
sql """
ALTER JOB ${jobName}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]