This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 81d32f6d679 Pipe: Fixed the NPE for pipe heartbeat when there are
nodes shutting down (#14584)
81d32f6d679 is described below
commit 81d32f6d67918f9821f77e7929c5eab04faa1177
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 30 17:55:00 2024 +0800
Pipe: Fixed the NPE for pipe heartbeat when there are nodes shutting down
(#14584)
---
.../manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java | 4 +---
.../manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java | 8 +++++---
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
index d960c3fe88a..5b6369a2c3c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
@@ -29,8 +29,6 @@ import
org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatistics
import
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeat;
import
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeatScheduler;
-import javax.validation.constraints.NotNull;
-
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -106,7 +104,7 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
public void parseHeartbeat(
final int dataNodeId,
- @NotNull final List<ByteBuffer> pipeMetaByteBufferListFromDataNode,
+ /* @Nullable */ final List<ByteBuffer>
pipeMetaByteBufferListFromDataNode,
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
/* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 02ed8cca2fc..547310ce49c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -22,8 +22,6 @@ package
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
-import javax.validation.constraints.NotNull;
-
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
@@ -37,10 +35,14 @@ public class PipeHeartbeat {
private final Map<PipeStaticMeta, Double> remainingTimeMap = new HashMap<>();
public PipeHeartbeat(
- @NotNull final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
+ /* @Nullable */ final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
/* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
+ // Pipe meta may be null for nodes shutting down, return empty heartbeat
+ if (Objects.isNull(pipeMetaByteBufferListFromAgent)) {
+ return;
+ }
for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) {
final PipeMeta pipeMeta =
PipeMeta.deserialize4TaskAgent(pipeMetaByteBufferListFromAgent.get(i));