This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 5b881882f28 branch-4.0: [fix](streaming) Fix NPE in StreamingInsertJob
when MetricRepo is not initialized during replay #61253 (#61295)
5b881882f28 is described below
commit 5b881882f28d266795842a729f0910e0a2ebc596
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 13 16:38:26 2026 +0800
branch-4.0: [fix](streaming) Fix NPE in StreamingInsertJob when MetricRepo
is not initialized during replay #61253 (#61295)
Cherry-picked from #61253
Co-authored-by: wudi <[email protected]>
---
.../insert/streaming/StreamingInsertJob.java | 51 ++++++++++++++--------
1 file changed, 33 insertions(+), 18 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index c820b8d532a..0b0faa0f64f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -540,12 +540,16 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
// and auto resume will automatically wake it up.
this.updateJobStatus(JobStatus.PAUSED);
-
MetricRepo.COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT.increase(1L);
+ if (MetricRepo.isInit) {
+
MetricRepo.COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT.increase(1L);
+ }
}
} finally {
long end = System.currentTimeMillis();
- MetricRepo.COUNTER_STREAMING_JOB_GET_META_LANTENCY.increase(end -
start);
- MetricRepo.COUNTER_STREAMING_JOB_GET_META_COUNT.increase(1L);
+ if (MetricRepo.isInit) {
+
MetricRepo.COUNTER_STREAMING_JOB_GET_META_LANTENCY.increase(end - start);
+ MetricRepo.COUNTER_STREAMING_JOB_GET_META_COUNT.increase(1L);
+ }
}
}
@@ -592,7 +596,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
failedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
this.failureReason = new FailureReason(task.getErrMsg());
- MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L);
+ if (MetricRepo.isInit) {
+
MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L);
+ }
} finally {
writeUnlock();
}
@@ -604,8 +610,11 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
resetFailureInfo(null);
succeedTaskCount.incrementAndGet();
//update metric
- MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT.increase(1L);
-
MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME.increase(task.getFinishTimeMs()
- task.getStartTimeMs());
+ if (MetricRepo.isInit) {
+
MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT.increase(1L);
+ MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME.increase(
+ task.getFinishTimeMs() - task.getStartTimeMs());
+ }
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
AbstractStreamingTask nextTask = createStreamingTask();
@@ -617,7 +626,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
}
- private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment
attachment) {
+ private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment
attachment, boolean isReplay) {
if (this.jobStatistic == null) {
this.jobStatistic = new StreamingJobStatistic();
}
@@ -628,11 +637,13 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
//update metric
-
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(attachment.getScannedRows());
-
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(attachment.getLoadBytes());
+ if (MetricRepo.isInit && !isReplay) {
+
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(attachment.getScannedRows());
+
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(attachment.getLoadBytes());
+ }
}
- private void
updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment) {
+ private void
updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment,
boolean isReplay) {
if (this.jobStatistic == null) {
this.jobStatistic = new StreamingJobStatistic();
}
@@ -643,8 +654,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
//update metric
-
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.update(attachment.getScannedRows());
-
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.update(attachment.getLoadBytes());
+ if (MetricRepo.isInit && !isReplay) {
+
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.update(attachment.getScannedRows());
+
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.update(attachment.getLoadBytes());
+ }
}
private void updateJobStatisticAndOffset(CommitOffsetRequest
offsetRequest) {
@@ -668,9 +681,11 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
offsetProvider.updateOffset(offsetProvider.deserializeOffset(offsetRequest.getOffset()));
//update metric
-
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(offsetRequest.getScannedRows());
-
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.increase(offsetRequest.getFilteredRows());
-
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(offsetRequest.getLoadBytes());
+ if (MetricRepo.isInit) {
+
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(offsetRequest.getScannedRows());
+
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.increase(offsetRequest.getFilteredRows());
+
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(offsetRequest.getLoadBytes());
+ }
}
@Override
@@ -1006,7 +1021,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
Preconditions.checkNotNull(txnState.getTxnCommitAttachment(),
txnState);
StreamingTaskTxnCommitAttachment attachment =
(StreamingTaskTxnCommitAttachment)
txnState.getTxnCommitAttachment();
- updateJobStatisticAndOffset(attachment);
+ updateJobStatisticAndOffset(attachment, false);
}
@Override
@@ -1014,7 +1029,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
Preconditions.checkNotNull(txnState.getTxnCommitAttachment(),
txnState);
StreamingTaskTxnCommitAttachment attachment =
(StreamingTaskTxnCommitAttachment)
txnState.getTxnCommitAttachment();
- updateJobStatisticAndOffset(attachment);
+ updateJobStatisticAndOffset(attachment, true);
succeedTaskCount.incrementAndGet();
}
@@ -1058,7 +1073,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
StreamingTaskTxnCommitAttachment commitAttach =
new
StreamingTaskTxnCommitAttachment(response.getCommitAttach());
- updateCloudJobStatisticAndOffset(commitAttach);
+ updateCloudJobStatisticAndOffset(commitAttach, true);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]