This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 81d32f6d679 Pipe: Fixed the NPE for pipe heartbeat when there are 
nodes shutting down (#14584)
81d32f6d679 is described below

commit 81d32f6d67918f9821f77e7929c5eab04faa1177
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 30 17:55:00 2024 +0800

    Pipe: Fixed the NPE for pipe heartbeat when there are nodes shutting down 
(#14584)
---
 .../manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java  | 4 +---
 .../manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java | 8 +++++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
index d960c3fe88a..5b6369a2c3c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
@@ -29,8 +29,6 @@ import 
org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatistics
 import 
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeat;
 import 
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeatScheduler;
 
-import javax.validation.constraints.NotNull;
-
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -106,7 +104,7 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
 
   public void parseHeartbeat(
       final int dataNodeId,
-      @NotNull final List<ByteBuffer> pipeMetaByteBufferListFromDataNode,
+      /* @Nullable */ final List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode,
       /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
       /* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
       /* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 02ed8cca2fc..547310ce49c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -22,8 +22,6 @@ package 
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 
-import javax.validation.constraints.NotNull;
-
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
@@ -37,10 +35,14 @@ public class PipeHeartbeat {
   private final Map<PipeStaticMeta, Double> remainingTimeMap = new HashMap<>();
 
   public PipeHeartbeat(
-      @NotNull final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
+      /* @Nullable */ final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
       /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
       /* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
       /* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
+    // Pipe meta may be null for nodes shutting down, return empty heartbeat
+    if (Objects.isNull(pipeMetaByteBufferListFromAgent)) {
+      return;
+    }
     for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) {
       final PipeMeta pipeMeta =
           
PipeMeta.deserialize4TaskAgent(pipeMetaByteBufferListFromAgent.get(i));

Reply via email to