This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5523601c6aa6da3834ba36563f6d5d5384b76813
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Dec 4 09:43:57 2025 +0800

    Pipe: Fix NullPointerException in concurrent event access (#16849)
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    (cherry picked from commit 5bc4779ba435d71a00e97d493deed9dbbad3bcac)
---
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  7 ++-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java | 73 +++++++++++++++-------
 2 files changed, 53 insertions(+), 27 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 8a86db3ded5..f5bafc5d857 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -142,9 +142,9 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
         throw e;
       } else {
         LOGGER.info(
-            "{} in pipe transfer, ignored because the connector subtask is 
dropped.",
+            "{} in pipe transfer, ignored because the connector subtask is 
dropped.{}",
             e.getClass().getSimpleName(),
-            e);
+            e.getMessage() != null ? " Message: " + e.getMessage() : "");
         clearReferenceCountAndReleaseLastEvent(event);
       }
     } catch (final Exception e) {
@@ -161,7 +161,8 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
             e);
       } else {
         LOGGER.info(
-            "Exception in pipe transfer, ignored because the connector subtask 
is dropped.", e);
+            "Exception in pipe transfer, ignored because the connector subtask 
is dropped.{}",
+            e.getMessage() != null ? " Message: " + e.getMessage() : "");
         clearReferenceCountAndReleaseLastEvent(event);
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 63cae75fbbd..f5ae873bbac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -157,16 +157,20 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
   }
 
   public ByteBuffer getByteBuffer() throws WALPipeException {
-    return insertNode.serializeToByteBuffer();
+    final InsertNode node = insertNode;
+    if (Objects.isNull(node)) {
+      throw new PipeException("InsertNode has been released");
+    }
+    return node.serializeToByteBuffer();
   }
 
   public String getDeviceId() {
-    if (Objects.isNull(insertNode)) {
+    final InsertNode node = insertNode;
+    if (Objects.isNull(node)) {
       return null;
     }
-    return Objects.nonNull(insertNode.getTargetPath())
-        ? insertNode.getTargetPath().getFullPath()
-        : null;
+    final PartialPath targetPath = node.getTargetPath();
+    return Objects.nonNull(targetPath) ? targetPath.getFullPath() : null;
   }
 
   public long getExtractTime() {
@@ -244,10 +248,14 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
       final boolean skipIfNoPrivileges,
       final long startTime,
       final long endTime) {
+    final InsertNode node = insertNode;
+    if (Objects.isNull(node)) {
+      throw new PipeException("InsertNode has been released");
+    }
     return new PipeInsertNodeTabletInsertionEvent(
         getRawIsTableModelEvent(),
         getSourceDatabaseNameFromDataRegion(),
-        insertNode,
+        node,
         pipeName,
         creationTime,
         pipeTaskMeta,
@@ -263,7 +271,11 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
 
   @Override
   public boolean isGeneratedByPipe() {
-    return insertNode.isGeneratedByPipe();
+    final InsertNode node = insertNode;
+    if (Objects.isNull(node)) {
+      throw new PipeException("InsertNode has been released");
+    }
+    return node.isGeneratedByPipe();
   }
 
   @Override
@@ -271,17 +283,22 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
     if (skipIfNoPrivileges || !isTableModelEvent()) {
       return;
     }
-    if (Objects.nonNull(insertNode.getTargetPath())) {
-      checkTableName(
-          
DeviceIDFactory.getInstance().getDeviceID(insertNode.getTargetPath()).getTableName());
-    } else if (insertNode instanceof InsertRowsNode) {
+    final InsertNode node = insertNode;
+    if (Objects.isNull(node)) {
+      // Event is released, skip privilege check
+      return;
+    }
+    final PartialPath targetPath = node.getTargetPath();
+    if (Objects.nonNull(targetPath)) {
+      
checkTableName(DeviceIDFactory.getInstance().getDeviceID(targetPath).getTableName());
+    } else if (node instanceof InsertRowsNode) {
       for (final String tableName :
-          ((InsertRowsNode) insertNode)
+          ((InsertRowsNode) node)
               .getInsertRowNodeList().stream()
                   .map(
-                      node ->
+                      insertRowNode ->
                           DeviceIDFactory.getInstance()
-                              .getDeviceID(node.getTargetPath())
+                              .getDeviceID(insertRowNode.getTargetPath())
                               .getTableName())
                   .collect(Collectors.toSet())) {
         checkTableName(tableName);
@@ -443,6 +460,9 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
 
       eventParsers = new ArrayList<>();
       final InsertNode node = getInsertNode();
+      if (Objects.isNull(node)) {
+        throw new PipeException("InsertNode has been released");
+      }
       switch (node.getType()) {
         case INSERT_ROW:
         case INSERT_TABLET:
@@ -526,11 +546,12 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
 
   @Override
   public String toString() {
+    final InsertNode node = insertNode;
     return String.format(
             "PipeInsertNodeTabletInsertionEvent{progressIndex=%s, 
isAligned=%s, isGeneratedByPipe=%s, eventParsers=%s}",
             progressIndex,
-            Objects.nonNull(insertNode) ? insertNode.isAligned() : null,
-            Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() : 
null,
+            Objects.nonNull(node) ? node.isAligned() : null,
+            Objects.nonNull(node) ? node.isGeneratedByPipe() : null,
             eventParsers)
         + " - "
         + super.toString();
@@ -538,11 +559,12 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
 
   @Override
   public String coreReportMessage() {
+    final InsertNode node = insertNode;
     return String.format(
             "PipeInsertNodeTabletInsertionEvent{progressIndex=%s, 
isAligned=%s, isGeneratedByPipe=%s}",
             progressIndex,
-            Objects.nonNull(insertNode) ? insertNode.isAligned() : null,
-            Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() : 
null)
+            Objects.nonNull(node) ? node.isAligned() : null,
+            Objects.nonNull(node) ? node.isGeneratedByPipe() : null)
         + " - "
         + super.coreReportMessage();
   }
@@ -567,12 +589,15 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
   // invoked, the event will soon be released.
   @Override
   public long ramBytesUsed() {
-    return bytes > 0
-        ? bytes
-        : (bytes =
-            INSTANCE_SIZE
-                + (Objects.nonNull(insertNode) ? 
InsertNodeMemoryEstimator.sizeOf(insertNode) : 0)
-                + (Objects.nonNull(progressIndex) ? 
progressIndex.ramBytesUsed() : 0));
+    if (bytes > 0) {
+      return bytes;
+    }
+    final InsertNode node = insertNode;
+    bytes =
+        INSTANCE_SIZE
+            + (Objects.nonNull(node) ? InsertNodeMemoryEstimator.sizeOf(node) 
: 0)
+            + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 
0);
+    return bytes;
   }
 
   private static class PipeInsertNodeTabletInsertionEventResource extends 
PipeEventResource {

Reply via email to