This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3a4900f6ac0739bbeb90879d30e953c6be012923 Author: Caideyipi <[email protected]> AuthorDate: Mon Jul 28 10:03:13 2025 +0800 Pipe: Optimized the floating memory calculation & Fixed potential NPE in insertRows privilege checking (#16039) * simplify * comp * Refactor * some * fix-potential-npe * cache (cherry picked from commit a4cadf75957e2c9680847f4466f358c1aff3b9f1) --- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 71 ++++++++++------------ .../event/realtime/PipeRealtimeEventFactory.java | 19 +----- 2 files changed, 33 insertions(+), 57 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index ffa4da79466..855f33b12e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -64,10 +64,8 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -84,35 +82,27 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent + RamUsageEstimator.shallowSizeOfInstance(AtomicInteger.class) + RamUsageEstimator.shallowSizeOfInstance(AtomicBoolean.class) + RamUsageEstimator.shallowSizeOf(Boolean.class); - private static final long SET_SIZE = RamUsageEstimator.shallowSizeOfInstance(HashSet.class); private final AtomicReference<PipeTabletMemoryBlock> allocatedMemoryBlock; private volatile List<Tablet> tablets; private List<TabletInsertionEventParser> eventParsers; - private final PartialPath devicePath; private InsertNode insertNode; private ProgressIndex progressIndex; - - // Only useful for insertRows - private final Set<String> tableNames; + private long bytes = Long.MIN_VALUE; private long extractTime = 0; public PipeInsertNodeTabletInsertionEvent( final Boolean isTableModel, final String databaseNameFromDataRegion, - final InsertNode insertNode, - final PartialPath devicePath, - final Set<String> tableNames) { + final InsertNode insertNode) { this( isTableModel, databaseNameFromDataRegion, insertNode, - devicePath, - tableNames, null, 0, null, @@ -128,8 +118,6 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent final Boolean isTableModelEvent, final String databaseNameFromDataRegion, final InsertNode insertNode, - final PartialPath devicePath, - final Set<String> tableNames, final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, @@ -152,9 +140,6 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent isTableModelEvent, databaseNameFromDataRegion); this.insertNode = insertNode; - // Record device path here so there's no need to get it from InsertNode cache later. - this.devicePath = devicePath; - this.tableNames = tableNames; this.progressIndex = insertNode.getProgressIndex(); this.allocatedMemoryBlock = new AtomicReference<>(); @@ -169,7 +154,12 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent } public String getDeviceId() { - return Objects.nonNull(devicePath) ? devicePath.getFullPath() : null; + if (Objects.isNull(insertNode)) { + return null; + } + return Objects.nonNull(insertNode.getTargetPath()) + ? insertNode.getTargetPath().getFullPath() + : null; } /////////////////////////// EnrichedEvent /////////////////////////// @@ -245,8 +235,6 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent getRawIsTableModelEvent(), getSourceDatabaseNameFromDataRegion(), insertNode, - devicePath, - tableNames, pipeName, creationTime, pipeTaskMeta, @@ -268,10 +256,19 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent if (skipIfNoPrivileges || !isTableModelEvent()) { return; } - if (Objects.nonNull(devicePath)) { - checkTableName(DeviceIDFactory.getInstance().getDeviceID(devicePath).getTableName()); - } else { - for (final String tableName : tableNames) { + if (Objects.nonNull(insertNode.getTargetPath())) { + checkTableName( + DeviceIDFactory.getInstance().getDeviceID(insertNode.getTargetPath()).getTableName()); + } else if (insertNode instanceof InsertRowsNode) { + for (final String tableName : + ((InsertRowsNode) insertNode) + .getInsertRowNodeList().stream() + .map( + node -> + DeviceIDFactory.getInstance() + .getDeviceID(node.getTargetPath()) + .getTableName()) + .collect(Collectors.toSet())) { checkTableName(tableName); } } @@ -285,9 +282,7 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent throw new AccessDeniedException( String.format( "No privilege for SELECT for user %s at table %s.%s", - userName, - tableModelDatabaseName, - DeviceIDFactory.getInstance().getDeviceID(devicePath).getTableName())); + userName, tableModelDatabaseName, tableName)); } } @@ -550,20 +545,18 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent } // Notes: - // 1. We only consider insertion event's memory for degrade and restart, because degrade/restart - // may not be of use for releasing other events' memory. - // 2. We do not consider eventParsers because they may not exist and if it is invoked, the event - // will soon be released. + // 1. We only consider insertion event's memory for degrading, because degrading may not be of use + // for releasing other events' memory. + // 2. We do not consider eventParsers and database names because they may not exist and if it is + // invoked, the event will soon be released. @Override public long ramBytesUsed() { - return INSTANCE_SIZE - + (Objects.nonNull(devicePath) ? PartialPath.estimateSize(devicePath) : 0) - + (Objects.nonNull(insertNode) ? InsertNodeMemoryEstimator.sizeOf(insertNode) : 0) - + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0) - + (Objects.nonNull(tableNames) - ? SET_SIZE - + tableNames.stream().mapToLong(RamUsageEstimator::sizeOf).reduce(0L, Long::sum) - : 0); + return bytes > 0 + ? bytes + : (bytes = + INSTANCE_SIZE + + (Objects.nonNull(insertNode) ? InsertNodeMemoryEstimator.sizeOf(insertNode) : 0) + + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0)); } private static class PipeInsertNodeTabletInsertionEventResource extends PipeEventResource { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java index 4941bae8a66..acc62f7e7a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java @@ -27,12 +27,8 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpochManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; -import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import java.util.stream.Collectors; - public class PipeRealtimeEventFactory { private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new TsFileEpochManager(); @@ -54,20 +50,7 @@ public class PipeRealtimeEventFactory { final TsFileResource resource) { final PipeInsertNodeTabletInsertionEvent insertionEvent = new PipeInsertNodeTabletInsertionEvent( - isTableModel, - databaseNameFromDataRegion, - insertNode, - insertNode.getTargetPath(), - (insertNode instanceof InsertRowsNode) - ? ((InsertRowsNode) insertNode) - .getInsertRowNodeList().stream() - .map( - node -> - DeviceIDFactory.getInstance() - .getDeviceID(node.getTargetPath()) - .getTableName()) - .collect(Collectors.toSet()) - : null); + isTableModel, databaseNameFromDataRegion, insertNode); return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent( insertionEvent, insertNode, resource);
