This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new d5c183482da Pipe: Fix NullPointerException in concurrent event access
(#16849) (#16872)
d5c183482da is described below
commit d5c183482da82276634f43aeecc76902b4db5156
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Dec 5 15:20:09 2025 +0800
Pipe: Fix NullPointerException in concurrent event access (#16849) (#16872)
* fix
* fix
* fix
* fix
* fix
(cherry picked from commit 5bc4779ba435d71a00e97d493deed9dbbad3bcac)
---
.../agent/task/subtask/sink/PipeSinkSubtask.java | 7 ++---
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 30 ++++++++++++++++++----
2 files changed, 29 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 5b917b5b4e5..e53f91390bd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -142,9 +142,9 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
throw e;
} else {
LOGGER.info(
- "{} in pipe transfer, ignored because the connector subtask is
dropped.",
+ "{} in pipe transfer, ignored because the connector subtask is
dropped.{}",
e.getClass().getSimpleName(),
- e);
+ e.getMessage() != null ? " Message: " + e.getMessage() : "");
clearReferenceCountAndReleaseLastEvent(event);
}
} catch (final Exception e) {
@@ -161,7 +161,8 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
e);
} else {
LOGGER.info(
- "Exception in pipe transfer, ignored because the connector subtask
is dropped.", e);
+ "Exception in pipe transfer, ignored because the connector subtask
is dropped.{}",
+ e.getMessage() != null ? " Message: " + e.getMessage() : "");
clearReferenceCountAndReleaseLastEvent(event);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index c8390a8baa1..9bdeb11a6a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -106,13 +106,20 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
public ByteBuffer getByteBuffer() throws WALPipeException {
- return insertNode.serializeToByteBuffer();
+ final InsertNode node = insertNode;
+ if (Objects.isNull(node)) {
+ throw new PipeException("InsertNode has been released");
+ }
+ return node.serializeToByteBuffer();
}
public String getDeviceId() {
- return Objects.nonNull(insertNode.getDevicePath())
- ? insertNode.getDevicePath().getFullPath()
- : null;
+ final InsertNode node = insertNode;
+ if (Objects.isNull(node)) {
+ return null;
+ }
+ final PartialPath targetPath = node.getDevicePath();
+ return Objects.nonNull(targetPath) ? targetPath.getFullPath() : null;
}
public long getExtractTime() {
@@ -185,13 +192,21 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
final PipePattern pattern,
final long startTime,
final long endTime) {
+ final InsertNode node = insertNode;
+ if (Objects.isNull(node)) {
+ throw new PipeException("InsertNode has been released");
+ }
return new PipeInsertNodeTabletInsertionEvent(
insertNode, pipeName, creationTime, pipeTaskMeta, pattern, startTime,
endTime);
}
@Override
public boolean isGeneratedByPipe() {
- return insertNode.isGeneratedByPipe();
+ final InsertNode node = insertNode;
+ if (Objects.isNull(node)) {
+ throw new PipeException("InsertNode has been released");
+ }
+ return node.isGeneratedByPipe();
}
@Override
@@ -328,6 +343,9 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
dataContainers = new ArrayList<>();
final InsertNode node = getInsertNode();
+ if (Objects.isNull(node)) {
+ throw new PipeException("InsertNode has been released");
+ }
switch (node.getType()) {
case INSERT_ROW:
case INSERT_TABLET:
@@ -393,6 +411,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public String toString() {
+ final InsertNode insertNode = this.insertNode;
return String.format(
"PipeInsertNodeTabletInsertionEvent{progressIndex=%s,
isAligned=%s, isGeneratedByPipe=%s, dataContainers=%s}",
progressIndex,
@@ -405,6 +424,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public String coreReportMessage() {
+ final InsertNode insertNode = this.insertNode;
return String.format(
"PipeInsertNodeTabletInsertionEvent{progressIndex=%s,
isAligned=%s, isGeneratedByPipe=%s}",
progressIndex,