This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 382664fe551 [fix](streaming) Fix NPE in StreamingInsertJob when 
MetricRepo is not initialized during replay (#61253)
382664fe551 is described below

commit 382664fe551ac89fb2e21d2e42ec1e2fc5ad6345
Author: wudi <[email protected]>
AuthorDate: Fri Mar 13 10:55:44 2026 +0800

    [fix](streaming) Fix NPE in StreamingInsertJob when MetricRepo is not 
initialized during replay (#61253)
    
    ### What problem does this PR solve?
    
    #### Problem
    
    `StreamingInsertJob.replayOnCommitted()` throws a `NullPointerException`
    during FE replay:
    
    ```
      java.lang.NullPointerException: Cannot invoke
      "org.apache.doris.metric.LongCounterMetric.increase(java.lang.Long)"
        because 
"org.apache.doris.metric.MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS" is null
          at 
StreamingInsertJob.updateJobStatisticAndOffset(StreamingInsertJob.java:634)
          at StreamingInsertJob.replayOnCommitted(StreamingInsertJob.java:1020)
          at 
TransactionState.replaySetTransactionStatus(TransactionState.java:589)
          at 
DatabaseTransactionMgr.replayUpsertTransactionState(DatabaseTransactionMgr.java:2636)
          at 
GlobalTransactionMgr.replayUpsertTransactionState(GlobalTransactionMgr.java:952)
    ```
    
    The root cause is that `MetricRepo` may not be initialized when FE
    replays transaction logs during startup,
    but `StreamingInsertJob` unconditionally calls metric update methods,
    leading to NPE.
    
    #### Fix
    
      Two separate changes are applied to `StreamingInsertJob`:
    
    1. **Skip metric updates during replay**: `updateJobStatisticAndOffset`
    and
    `updateCloudJobStatisticAndOffset` now accept an `isReplay` boolean
    parameter. Call sites in
    `replayOnCommitted` and `replayOnCloudMode` pass `true`, while
    `afterCommitted` passes `false`.
    
    2. **Guard all metric calls with `MetricRepo.isInit`**: All remaining
    `MetricRepo.COUNTER_STREAMING_JOB_*`
    call sites are wrapped with `if (MetricRepo.isInit && !isReplay)` or `if
    (MetricRepo.isInit)` to prevent NPE
       if `MetricRepo` has not been fully initialized.
---
 .../insert/streaming/StreamingInsertJob.java       | 51 ++++++++++++++--------
 1 file changed, 33 insertions(+), 18 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index e8d168eaa43..c801eb16ddb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -543,12 +543,16 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 // and auto resume will automatically wake it up.
                 this.updateJobStatus(JobStatus.PAUSED);
 
-                
MetricRepo.COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT.increase(1L);
+                if (MetricRepo.isInit) {
+                    
MetricRepo.COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT.increase(1L);
+                }
             }
         } finally {
             long end = System.currentTimeMillis();
-            MetricRepo.COUNTER_STREAMING_JOB_GET_META_LANTENCY.increase(end - 
start);
-            MetricRepo.COUNTER_STREAMING_JOB_GET_META_COUNT.increase(1L);
+            if (MetricRepo.isInit) {
+                
MetricRepo.COUNTER_STREAMING_JOB_GET_META_LANTENCY.increase(end - start);
+                MetricRepo.COUNTER_STREAMING_JOB_GET_META_COUNT.increase(1L);
+            }
         }
     }
 
@@ -595,7 +599,9 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             failedTaskCount.incrementAndGet();
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
             this.failureReason = new FailureReason(task.getErrMsg());
-            MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L);
+            if (MetricRepo.isInit) {
+                
MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L);
+            }
         } finally {
             writeUnlock();
         }
@@ -607,8 +613,11 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             resetFailureInfo(null);
             succeedTaskCount.incrementAndGet();
             //update metric
-            MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT.increase(1L);
-            
MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME.increase(task.getFinishTimeMs()
 - task.getStartTimeMs());
+            if (MetricRepo.isInit) {
+                
MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT.increase(1L);
+                MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME.increase(
+                        task.getFinishTimeMs() - task.getStartTimeMs());
+            }
 
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
             AbstractStreamingTask nextTask = createStreamingTask();
@@ -620,7 +629,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
     }
 
-    private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment 
attachment) {
+    private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment 
attachment, boolean isReplay) {
         if (this.jobStatistic == null) {
             this.jobStatistic = new StreamingJobStatistic();
         }
@@ -631,11 +640,13 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
 
         //update metric
-        
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(attachment.getScannedRows());
-        
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(attachment.getLoadBytes());
+        if (MetricRepo.isInit && !isReplay) {
+            
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(attachment.getScannedRows());
+            
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(attachment.getLoadBytes());
+        }
     }
 
-    private void 
updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment) {
+    private void 
updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment, 
boolean isReplay) {
         if (this.jobStatistic == null) {
             this.jobStatistic = new StreamingJobStatistic();
         }
@@ -646,8 +657,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
 
         //update metric
-        
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.update(attachment.getScannedRows());
-        
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.update(attachment.getLoadBytes());
+        if (MetricRepo.isInit && !isReplay) {
+            
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.update(attachment.getScannedRows());
+            
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.update(attachment.getLoadBytes());
+        }
     }
 
     private void updateJobStatisticAndOffset(CommitOffsetRequest 
offsetRequest) {
@@ -671,9 +684,11 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(offsetRequest.getOffset()));
 
         //update metric
-        
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(offsetRequest.getScannedRows());
-        
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.increase(offsetRequest.getFilteredRows());
-        
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(offsetRequest.getLoadBytes());
+        if (MetricRepo.isInit) {
+            
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(offsetRequest.getScannedRows());
+            
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.increase(offsetRequest.getFilteredRows());
+            
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(offsetRequest.getLoadBytes());
+        }
     }
 
     @Override
@@ -1009,7 +1024,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), 
txnState);
         StreamingTaskTxnCommitAttachment attachment =
                 (StreamingTaskTxnCommitAttachment) 
txnState.getTxnCommitAttachment();
-        updateJobStatisticAndOffset(attachment);
+        updateJobStatisticAndOffset(attachment, false);
     }
 
     @Override
@@ -1017,7 +1032,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), 
txnState);
         StreamingTaskTxnCommitAttachment attachment =
                 (StreamingTaskTxnCommitAttachment) 
txnState.getTxnCommitAttachment();
-        updateJobStatisticAndOffset(attachment);
+        updateJobStatisticAndOffset(attachment, true);
         succeedTaskCount.incrementAndGet();
     }
 
@@ -1061,7 +1076,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
         StreamingTaskTxnCommitAttachment commitAttach =
                 new 
StreamingTaskTxnCommitAttachment(response.getCommitAttach());
-        updateCloudJobStatisticAndOffset(commitAttach);
+        updateCloudJobStatisticAndOffset(commitAttach, true);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to