This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new d5c183482da Pipe: Fix NullPointerException in concurrent event access 
(#16849) (#16872)
d5c183482da is described below

commit d5c183482da82276634f43aeecc76902b4db5156
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Dec 5 15:20:09 2025 +0800

    Pipe: Fix NullPointerException in concurrent event access (#16849) (#16872)
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    (cherry picked from commit 5bc4779ba435d71a00e97d493deed9dbbad3bcac)
---
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  7 ++---
 .../tablet/PipeInsertNodeTabletInsertionEvent.java | 30 ++++++++++++++++++----
 2 files changed, 29 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 5b917b5b4e5..e53f91390bd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -142,9 +142,9 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
         throw e;
       } else {
         LOGGER.info(
-            "{} in pipe transfer, ignored because the connector subtask is 
dropped.",
+            "{} in pipe transfer, ignored because the connector subtask is 
dropped.{}",
             e.getClass().getSimpleName(),
-            e);
+            e.getMessage() != null ? " Message: " + e.getMessage() : "");
         clearReferenceCountAndReleaseLastEvent(event);
       }
     } catch (final Exception e) {
@@ -161,7 +161,8 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
             e);
       } else {
         LOGGER.info(
-            "Exception in pipe transfer, ignored because the connector subtask 
is dropped.", e);
+            "Exception in pipe transfer, ignored because the connector subtask 
is dropped.{}",
+            e.getMessage() != null ? " Message: " + e.getMessage() : "");
         clearReferenceCountAndReleaseLastEvent(event);
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index c8390a8baa1..9bdeb11a6a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -106,13 +106,20 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   }
 
   public ByteBuffer getByteBuffer() throws WALPipeException {
-    return insertNode.serializeToByteBuffer();
+    final InsertNode node = insertNode;
+    if (Objects.isNull(node)) {
+      throw new PipeException("InsertNode has been released");
+    }
+    return node.serializeToByteBuffer();
   }
 
   public String getDeviceId() {
-    return Objects.nonNull(insertNode.getDevicePath())
-        ? insertNode.getDevicePath().getFullPath()
-        : null;
+    final InsertNode node = insertNode;
+    if (Objects.isNull(node)) {
+      return null;
+    }
+    final PartialPath targetPath = node.getDevicePath();
+    return Objects.nonNull(targetPath) ? targetPath.getFullPath() : null;
   }
 
   public long getExtractTime() {
@@ -185,13 +192,21 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       final PipePattern pattern,
       final long startTime,
       final long endTime) {
+    final InsertNode node = insertNode;
+    if (Objects.isNull(node)) {
+      throw new PipeException("InsertNode has been released");
+    }
     return new PipeInsertNodeTabletInsertionEvent(
         insertNode, pipeName, creationTime, pipeTaskMeta, pattern, startTime, 
endTime);
   }
 
   @Override
   public boolean isGeneratedByPipe() {
-    return insertNode.isGeneratedByPipe();
+    final InsertNode node = insertNode;
+    if (Objects.isNull(node)) {
+      throw new PipeException("InsertNode has been released");
+    }
+    return node.isGeneratedByPipe();
   }
 
   @Override
@@ -328,6 +343,9 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
       dataContainers = new ArrayList<>();
       final InsertNode node = getInsertNode();
+      if (Objects.isNull(node)) {
+        throw new PipeException("InsertNode has been released");
+      }
       switch (node.getType()) {
         case INSERT_ROW:
         case INSERT_TABLET:
@@ -393,6 +411,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   @Override
   public String toString() {
+    final InsertNode insertNode = this.insertNode;
     return String.format(
             "PipeInsertNodeTabletInsertionEvent{progressIndex=%s, 
isAligned=%s, isGeneratedByPipe=%s, dataContainers=%s}",
             progressIndex,
@@ -405,6 +424,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   @Override
   public String coreReportMessage() {
+    final InsertNode insertNode = this.insertNode;
     return String.format(
             "PipeInsertNodeTabletInsertionEvent{progressIndex=%s, 
isAligned=%s, isGeneratedByPipe=%s}",
             progressIndex,

Reply via email to