This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3a4900f6ac0739bbeb90879d30e953c6be012923
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jul 28 10:03:13 2025 +0800

    Pipe: Optimized the floating memory calculation & Fixed potential NPE in 
insertRows privilege checking (#16039)
    
    * simplify
    
    * comp
    
    * Refactor
    
    * some
    
    * fix-potential-npe
    
    * cache
    
    (cherry picked from commit a4cadf75957e2c9680847f4466f358c1aff3b9f1)
---
 .../tablet/PipeInsertNodeTabletInsertionEvent.java | 71 ++++++++++------------
 .../event/realtime/PipeRealtimeEventFactory.java   | 19 +-----
 2 files changed, 33 insertions(+), 57 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index ffa4da79466..855f33b12e2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -64,10 +64,8 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -84,35 +82,27 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
           + RamUsageEstimator.shallowSizeOfInstance(AtomicInteger.class)
           + RamUsageEstimator.shallowSizeOfInstance(AtomicBoolean.class)
           + RamUsageEstimator.shallowSizeOf(Boolean.class);
-  private static final long SET_SIZE = 
RamUsageEstimator.shallowSizeOfInstance(HashSet.class);
 
   private final AtomicReference<PipeTabletMemoryBlock> allocatedMemoryBlock;
   private volatile List<Tablet> tablets;
 
   private List<TabletInsertionEventParser> eventParsers;
 
-  private final PartialPath devicePath;
   private InsertNode insertNode;
 
   private ProgressIndex progressIndex;
-
-  // Only useful for insertRows
-  private final Set<String> tableNames;
+  private long bytes = Long.MIN_VALUE;
 
   private long extractTime = 0;
 
   public PipeInsertNodeTabletInsertionEvent(
       final Boolean isTableModel,
       final String databaseNameFromDataRegion,
-      final InsertNode insertNode,
-      final PartialPath devicePath,
-      final Set<String> tableNames) {
+      final InsertNode insertNode) {
     this(
         isTableModel,
         databaseNameFromDataRegion,
         insertNode,
-        devicePath,
-        tableNames,
         null,
         0,
         null,
@@ -128,8 +118,6 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
       final Boolean isTableModelEvent,
       final String databaseNameFromDataRegion,
       final InsertNode insertNode,
-      final PartialPath devicePath,
-      final Set<String> tableNames,
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
@@ -152,9 +140,6 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
         isTableModelEvent,
         databaseNameFromDataRegion);
     this.insertNode = insertNode;
-    // Record device path here so there's no need to get it from InsertNode 
cache later.
-    this.devicePath = devicePath;
-    this.tableNames = tableNames;
     this.progressIndex = insertNode.getProgressIndex();
 
     this.allocatedMemoryBlock = new AtomicReference<>();
@@ -169,7 +154,12 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
   }
 
   public String getDeviceId() {
-    return Objects.nonNull(devicePath) ? devicePath.getFullPath() : null;
+    if (Objects.isNull(insertNode)) {
+      return null;
+    }
+    return Objects.nonNull(insertNode.getTargetPath())
+        ? insertNode.getTargetPath().getFullPath()
+        : null;
   }
 
   /////////////////////////// EnrichedEvent ///////////////////////////
@@ -245,8 +235,6 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
         getRawIsTableModelEvent(),
         getSourceDatabaseNameFromDataRegion(),
         insertNode,
-        devicePath,
-        tableNames,
         pipeName,
         creationTime,
         pipeTaskMeta,
@@ -268,10 +256,19 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
     if (skipIfNoPrivileges || !isTableModelEvent()) {
       return;
     }
-    if (Objects.nonNull(devicePath)) {
-      
checkTableName(DeviceIDFactory.getInstance().getDeviceID(devicePath).getTableName());
-    } else {
-      for (final String tableName : tableNames) {
+    if (Objects.nonNull(insertNode.getTargetPath())) {
+      checkTableName(
+          
DeviceIDFactory.getInstance().getDeviceID(insertNode.getTargetPath()).getTableName());
+    } else if (insertNode instanceof InsertRowsNode) {
+      for (final String tableName :
+          ((InsertRowsNode) insertNode)
+              .getInsertRowNodeList().stream()
+                  .map(
+                      node ->
+                          DeviceIDFactory.getInstance()
+                              .getDeviceID(node.getTargetPath())
+                              .getTableName())
+                  .collect(Collectors.toSet())) {
         checkTableName(tableName);
       }
     }
@@ -285,9 +282,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
       throw new AccessDeniedException(
           String.format(
               "No privilege for SELECT for user %s at table %s.%s",
-              userName,
-              tableModelDatabaseName,
-              
DeviceIDFactory.getInstance().getDeviceID(devicePath).getTableName()));
+              userName, tableModelDatabaseName, tableName));
     }
   }
 
@@ -550,20 +545,18 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
   }
 
   // Notes:
-  // 1. We only consider insertion event's memory for degrade and restart, 
because degrade/restart
-  // may not be of use for releasing other events' memory.
-  // 2. We do not consider eventParsers because they may not exist and if it 
is invoked, the event
-  // will soon be released.
+  // 1. We only consider insertion event's memory for degrading, because 
degrading may not be of use
+  // for releasing other events' memory.
+  // 2. We do not consider eventParsers and database names because they may 
not exist and if it is
+  // invoked, the event will soon be released.
   @Override
   public long ramBytesUsed() {
-    return INSTANCE_SIZE
-        + (Objects.nonNull(devicePath) ? PartialPath.estimateSize(devicePath) 
: 0)
-        + (Objects.nonNull(insertNode) ? 
InsertNodeMemoryEstimator.sizeOf(insertNode) : 0)
-        + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0)
-        + (Objects.nonNull(tableNames)
-            ? SET_SIZE
-                + 
tableNames.stream().mapToLong(RamUsageEstimator::sizeOf).reduce(0L, Long::sum)
-            : 0);
+    return bytes > 0
+        ? bytes
+        : (bytes =
+            INSTANCE_SIZE
+                + (Objects.nonNull(insertNode) ? 
InsertNodeMemoryEstimator.sizeOf(insertNode) : 0)
+                + (Objects.nonNull(progressIndex) ? 
progressIndex.ramBytesUsed() : 0));
   }
 
   private static class PipeInsertNodeTabletInsertionEventResource extends 
PipeEventResource {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 4941bae8a66..acc62f7e7a4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -27,12 +27,8 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpochManager;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
-import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
-import java.util.stream.Collectors;
-
 public class PipeRealtimeEventFactory {
   private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new 
TsFileEpochManager();
 
@@ -54,20 +50,7 @@ public class PipeRealtimeEventFactory {
       final TsFileResource resource) {
     final PipeInsertNodeTabletInsertionEvent insertionEvent =
         new PipeInsertNodeTabletInsertionEvent(
-            isTableModel,
-            databaseNameFromDataRegion,
-            insertNode,
-            insertNode.getTargetPath(),
-            (insertNode instanceof InsertRowsNode)
-                ? ((InsertRowsNode) insertNode)
-                    .getInsertRowNodeList().stream()
-                        .map(
-                            node ->
-                                DeviceIDFactory.getInstance()
-                                    .getDeviceID(node.getTargetPath())
-                                    .getTableName())
-                        .collect(Collectors.toSet())
-                : null);
+            isTableModel, databaseNameFromDataRegion, insertNode);
 
     return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
         insertionEvent, insertNode, resource);

Reply via email to