This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5bc4779ba43 Pipe: Fix NullPointerException in concurrent event access
(#16849)
5bc4779ba43 is described below
commit 5bc4779ba435d71a00e97d493deed9dbbad3bcac
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Dec 4 09:43:57 2025 +0800
Pipe: Fix NullPointerException in concurrent event access (#16849)
* fix
* fix
* fix
* fix
* fix
---
.../agent/task/subtask/sink/PipeSinkSubtask.java | 7 ++-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 73 +++++++++++++++-------
2 files changed, 53 insertions(+), 27 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 8a86db3ded5..f5bafc5d857 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -142,9 +142,9 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
throw e;
} else {
LOGGER.info(
- "{} in pipe transfer, ignored because the connector subtask is
dropped.",
+ "{} in pipe transfer, ignored because the connector subtask is
dropped.{}",
e.getClass().getSimpleName(),
- e);
+ e.getMessage() != null ? " Message: " + e.getMessage() : "");
clearReferenceCountAndReleaseLastEvent(event);
}
} catch (final Exception e) {
@@ -161,7 +161,8 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
e);
} else {
LOGGER.info(
- "Exception in pipe transfer, ignored because the connector subtask
is dropped.", e);
+ "Exception in pipe transfer, ignored because the connector subtask
is dropped.{}",
+ e.getMessage() != null ? " Message: " + e.getMessage() : "");
clearReferenceCountAndReleaseLastEvent(event);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 63cae75fbbd..f5ae873bbac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -157,16 +157,20 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
}
public ByteBuffer getByteBuffer() throws WALPipeException {
- return insertNode.serializeToByteBuffer();
+ final InsertNode node = insertNode;
+ if (Objects.isNull(node)) {
+ throw new PipeException("InsertNode has been released");
+ }
+ return node.serializeToByteBuffer();
}
public String getDeviceId() {
- if (Objects.isNull(insertNode)) {
+ final InsertNode node = insertNode;
+ if (Objects.isNull(node)) {
return null;
}
- return Objects.nonNull(insertNode.getTargetPath())
- ? insertNode.getTargetPath().getFullPath()
- : null;
+ final PartialPath targetPath = node.getTargetPath();
+ return Objects.nonNull(targetPath) ? targetPath.getFullPath() : null;
}
public long getExtractTime() {
@@ -244,10 +248,14 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
final boolean skipIfNoPrivileges,
final long startTime,
final long endTime) {
+ final InsertNode node = insertNode;
+ if (Objects.isNull(node)) {
+ throw new PipeException("InsertNode has been released");
+ }
return new PipeInsertNodeTabletInsertionEvent(
getRawIsTableModelEvent(),
getSourceDatabaseNameFromDataRegion(),
- insertNode,
+ node,
pipeName,
creationTime,
pipeTaskMeta,
@@ -263,7 +271,11 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
@Override
public boolean isGeneratedByPipe() {
- return insertNode.isGeneratedByPipe();
+ final InsertNode node = insertNode;
+ if (Objects.isNull(node)) {
+ throw new PipeException("InsertNode has been released");
+ }
+ return node.isGeneratedByPipe();
}
@Override
@@ -271,17 +283,22 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
if (skipIfNoPrivileges || !isTableModelEvent()) {
return;
}
- if (Objects.nonNull(insertNode.getTargetPath())) {
- checkTableName(
-
DeviceIDFactory.getInstance().getDeviceID(insertNode.getTargetPath()).getTableName());
- } else if (insertNode instanceof InsertRowsNode) {
+ final InsertNode node = insertNode;
+ if (Objects.isNull(node)) {
+ // Event is released, skip privilege check
+ return;
+ }
+ final PartialPath targetPath = node.getTargetPath();
+ if (Objects.nonNull(targetPath)) {
+
checkTableName(DeviceIDFactory.getInstance().getDeviceID(targetPath).getTableName());
+ } else if (node instanceof InsertRowsNode) {
for (final String tableName :
- ((InsertRowsNode) insertNode)
+ ((InsertRowsNode) node)
.getInsertRowNodeList().stream()
.map(
- node ->
+ insertRowNode ->
DeviceIDFactory.getInstance()
- .getDeviceID(node.getTargetPath())
+ .getDeviceID(insertRowNode.getTargetPath())
.getTableName())
.collect(Collectors.toSet())) {
checkTableName(tableName);
@@ -443,6 +460,9 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
eventParsers = new ArrayList<>();
final InsertNode node = getInsertNode();
+ if (Objects.isNull(node)) {
+ throw new PipeException("InsertNode has been released");
+ }
switch (node.getType()) {
case INSERT_ROW:
case INSERT_TABLET:
@@ -526,11 +546,12 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
@Override
public String toString() {
+ final InsertNode node = insertNode;
return String.format(
"PipeInsertNodeTabletInsertionEvent{progressIndex=%s,
isAligned=%s, isGeneratedByPipe=%s, eventParsers=%s}",
progressIndex,
- Objects.nonNull(insertNode) ? insertNode.isAligned() : null,
- Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() :
null,
+ Objects.nonNull(node) ? node.isAligned() : null,
+ Objects.nonNull(node) ? node.isGeneratedByPipe() : null,
eventParsers)
+ " - "
+ super.toString();
@@ -538,11 +559,12 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
@Override
public String coreReportMessage() {
+ final InsertNode node = insertNode;
return String.format(
"PipeInsertNodeTabletInsertionEvent{progressIndex=%s,
isAligned=%s, isGeneratedByPipe=%s}",
progressIndex,
- Objects.nonNull(insertNode) ? insertNode.isAligned() : null,
- Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() :
null)
+ Objects.nonNull(node) ? node.isAligned() : null,
+ Objects.nonNull(node) ? node.isGeneratedByPipe() : null)
+ " - "
+ super.coreReportMessage();
}
@@ -567,12 +589,15 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
// invoked, the event will soon be released.
@Override
public long ramBytesUsed() {
- return bytes > 0
- ? bytes
- : (bytes =
- INSTANCE_SIZE
- + (Objects.nonNull(insertNode) ?
InsertNodeMemoryEstimator.sizeOf(insertNode) : 0)
- + (Objects.nonNull(progressIndex) ?
progressIndex.ramBytesUsed() : 0));
+ if (bytes > 0) {
+ return bytes;
+ }
+ final InsertNode node = insertNode;
+ bytes =
+ INSTANCE_SIZE
+ + (Objects.nonNull(node) ? InsertNodeMemoryEstimator.sizeOf(node)
: 0)
+ + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() :
0);
+ return bytes;
}
private static class PipeInsertNodeTabletInsertionEventResource extends
PipeEventResource {