This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a4cadf75957 Pipe: Optimized the floating memory calculation & Fixed
potential NPE in insertRows privilege checking (#16039)
a4cadf75957 is described below
commit a4cadf75957e2c9680847f4466f358c1aff3b9f1
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jul 28 10:03:13 2025 +0800
Pipe: Optimized the floating memory calculation & Fixed potential NPE in
insertRows privilege checking (#16039)
* simplify
* comp
* Refactor
* some
* fix-potential-npe
* cache
---
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 71 ++++++++++------------
.../event/realtime/PipeRealtimeEventFactory.java | 19 +-----
2 files changed, 33 insertions(+), 57 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index ffa4da79466..855f33b12e2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -64,10 +64,8 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -84,35 +82,27 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
+ RamUsageEstimator.shallowSizeOfInstance(AtomicInteger.class)
+ RamUsageEstimator.shallowSizeOfInstance(AtomicBoolean.class)
+ RamUsageEstimator.shallowSizeOf(Boolean.class);
- private static final long SET_SIZE =
RamUsageEstimator.shallowSizeOfInstance(HashSet.class);
private final AtomicReference<PipeTabletMemoryBlock> allocatedMemoryBlock;
private volatile List<Tablet> tablets;
private List<TabletInsertionEventParser> eventParsers;
- private final PartialPath devicePath;
private InsertNode insertNode;
private ProgressIndex progressIndex;
-
- // Only useful for insertRows
- private final Set<String> tableNames;
+ private long bytes = Long.MIN_VALUE;
private long extractTime = 0;
public PipeInsertNodeTabletInsertionEvent(
final Boolean isTableModel,
final String databaseNameFromDataRegion,
- final InsertNode insertNode,
- final PartialPath devicePath,
- final Set<String> tableNames) {
+ final InsertNode insertNode) {
this(
isTableModel,
databaseNameFromDataRegion,
insertNode,
- devicePath,
- tableNames,
null,
0,
null,
@@ -128,8 +118,6 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
final Boolean isTableModelEvent,
final String databaseNameFromDataRegion,
final InsertNode insertNode,
- final PartialPath devicePath,
- final Set<String> tableNames,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
@@ -152,9 +140,6 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
isTableModelEvent,
databaseNameFromDataRegion);
this.insertNode = insertNode;
- // Record device path here so there's no need to get it from InsertNode
cache later.
- this.devicePath = devicePath;
- this.tableNames = tableNames;
this.progressIndex = insertNode.getProgressIndex();
this.allocatedMemoryBlock = new AtomicReference<>();
@@ -169,7 +154,12 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
}
public String getDeviceId() {
- return Objects.nonNull(devicePath) ? devicePath.getFullPath() : null;
+ if (Objects.isNull(insertNode)) {
+ return null;
+ }
+ return Objects.nonNull(insertNode.getTargetPath())
+ ? insertNode.getTargetPath().getFullPath()
+ : null;
}
/////////////////////////// EnrichedEvent ///////////////////////////
@@ -245,8 +235,6 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
getRawIsTableModelEvent(),
getSourceDatabaseNameFromDataRegion(),
insertNode,
- devicePath,
- tableNames,
pipeName,
creationTime,
pipeTaskMeta,
@@ -268,10 +256,19 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
if (skipIfNoPrivileges || !isTableModelEvent()) {
return;
}
- if (Objects.nonNull(devicePath)) {
-
checkTableName(DeviceIDFactory.getInstance().getDeviceID(devicePath).getTableName());
- } else {
- for (final String tableName : tableNames) {
+ if (Objects.nonNull(insertNode.getTargetPath())) {
+ checkTableName(
+
DeviceIDFactory.getInstance().getDeviceID(insertNode.getTargetPath()).getTableName());
+ } else if (insertNode instanceof InsertRowsNode) {
+ for (final String tableName :
+ ((InsertRowsNode) insertNode)
+ .getInsertRowNodeList().stream()
+ .map(
+ node ->
+ DeviceIDFactory.getInstance()
+ .getDeviceID(node.getTargetPath())
+ .getTableName())
+ .collect(Collectors.toSet())) {
checkTableName(tableName);
}
}
@@ -285,9 +282,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
throw new AccessDeniedException(
String.format(
"No privilege for SELECT for user %s at table %s.%s",
- userName,
- tableModelDatabaseName,
-
DeviceIDFactory.getInstance().getDeviceID(devicePath).getTableName()));
+ userName, tableModelDatabaseName, tableName));
}
}
@@ -550,20 +545,18 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
}
// Notes:
- // 1. We only consider insertion event's memory for degrade and restart,
because degrade/restart
- // may not be of use for releasing other events' memory.
- // 2. We do not consider eventParsers because they may not exist and if it
is invoked, the event
- // will soon be released.
+ // 1. We only consider insertion event's memory for degrading, because
degrading may not be of use
+ // for releasing other events' memory.
+ // 2. We do not consider eventParsers and database names because they may
not exist and if it is
+ // invoked, the event will soon be released.
@Override
public long ramBytesUsed() {
- return INSTANCE_SIZE
- + (Objects.nonNull(devicePath) ? PartialPath.estimateSize(devicePath)
: 0)
- + (Objects.nonNull(insertNode) ?
InsertNodeMemoryEstimator.sizeOf(insertNode) : 0)
- + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0)
- + (Objects.nonNull(tableNames)
- ? SET_SIZE
- +
tableNames.stream().mapToLong(RamUsageEstimator::sizeOf).reduce(0L, Long::sum)
- : 0);
+ return bytes > 0
+ ? bytes
+ : (bytes =
+ INSTANCE_SIZE
+ + (Objects.nonNull(insertNode) ?
InsertNodeMemoryEstimator.sizeOf(insertNode) : 0)
+ + (Objects.nonNull(progressIndex) ?
progressIndex.ramBytesUsed() : 0));
}
private static class PipeInsertNodeTabletInsertionEventResource extends
PipeEventResource {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 4941bae8a66..acc62f7e7a4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -27,12 +27,8 @@ import
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpochManager;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
-import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import java.util.stream.Collectors;
-
public class PipeRealtimeEventFactory {
private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new
TsFileEpochManager();
@@ -54,20 +50,7 @@ public class PipeRealtimeEventFactory {
final TsFileResource resource) {
final PipeInsertNodeTabletInsertionEvent insertionEvent =
new PipeInsertNodeTabletInsertionEvent(
- isTableModel,
- databaseNameFromDataRegion,
- insertNode,
- insertNode.getTargetPath(),
- (insertNode instanceof InsertRowsNode)
- ? ((InsertRowsNode) insertNode)
- .getInsertRowNodeList().stream()
- .map(
- node ->
- DeviceIDFactory.getInstance()
- .getDeviceID(node.getTargetPath())
- .getTableName())
- .collect(Collectors.toSet())
- : null);
+ isTableModel, databaseNameFromDataRegion, insertNode);
return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
insertionEvent, insertNode, resource);