This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5523601c6aa6da3834ba36563f6d5d5384b76813 Author: Zhenyu Luo <[email protected]> AuthorDate: Thu Dec 4 09:43:57 2025 +0800 Pipe: Fix NullPointerException in concurrent event access (#16849) * fix * fix * fix * fix * fix (cherry picked from commit 5bc4779ba435d71a00e97d493deed9dbbad3bcac) --- .../agent/task/subtask/sink/PipeSinkSubtask.java | 7 ++- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 73 +++++++++++++++------- 2 files changed, 53 insertions(+), 27 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 8a86db3ded5..f5bafc5d857 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -142,9 +142,9 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { throw e; } else { LOGGER.info( - "{} in pipe transfer, ignored because the connector subtask is dropped.", + "{} in pipe transfer, ignored because the connector subtask is dropped.{}", e.getClass().getSimpleName(), - e); + e.getMessage() != null ? " Message: " + e.getMessage() : ""); clearReferenceCountAndReleaseLastEvent(event); } } catch (final Exception e) { @@ -161,7 +161,8 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { e); } else { LOGGER.info( - "Exception in pipe transfer, ignored because the connector subtask is dropped.", e); + "Exception in pipe transfer, ignored because the connector subtask is dropped.{}", + e.getMessage() != null ? " Message: " + e.getMessage() : ""); clearReferenceCountAndReleaseLastEvent(event); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 63cae75fbbd..f5ae873bbac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -157,16 +157,20 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent } public ByteBuffer getByteBuffer() throws WALPipeException { - return insertNode.serializeToByteBuffer(); + final InsertNode node = insertNode; + if (Objects.isNull(node)) { + throw new PipeException("InsertNode has been released"); + } + return node.serializeToByteBuffer(); } public String getDeviceId() { - if (Objects.isNull(insertNode)) { + final InsertNode node = insertNode; + if (Objects.isNull(node)) { return null; } - return Objects.nonNull(insertNode.getTargetPath()) - ? insertNode.getTargetPath().getFullPath() - : null; + final PartialPath targetPath = node.getTargetPath(); + return Objects.nonNull(targetPath) ? targetPath.getFullPath() : null; } public long getExtractTime() { @@ -244,10 +248,14 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent final boolean skipIfNoPrivileges, final long startTime, final long endTime) { + final InsertNode node = insertNode; + if (Objects.isNull(node)) { + throw new PipeException("InsertNode has been released"); + } return new PipeInsertNodeTabletInsertionEvent( getRawIsTableModelEvent(), getSourceDatabaseNameFromDataRegion(), - insertNode, + node, pipeName, creationTime, pipeTaskMeta, @@ -263,7 +271,11 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent @Override public boolean isGeneratedByPipe() { - return insertNode.isGeneratedByPipe(); + final InsertNode node = insertNode; + if (Objects.isNull(node)) { + throw new PipeException("InsertNode has been released"); + } + return node.isGeneratedByPipe(); } @Override @@ -271,17 +283,22 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent if (skipIfNoPrivileges || !isTableModelEvent()) { return; } - if (Objects.nonNull(insertNode.getTargetPath())) { - checkTableName( - DeviceIDFactory.getInstance().getDeviceID(insertNode.getTargetPath()).getTableName()); - } else if (insertNode instanceof InsertRowsNode) { + final InsertNode node = insertNode; + if (Objects.isNull(node)) { + // Event is released, skip privilege check + return; + } + final PartialPath targetPath = node.getTargetPath(); + if (Objects.nonNull(targetPath)) { + checkTableName(DeviceIDFactory.getInstance().getDeviceID(targetPath).getTableName()); + } else if (node instanceof InsertRowsNode) { for (final String tableName : - ((InsertRowsNode) insertNode) + ((InsertRowsNode) node) .getInsertRowNodeList().stream() .map( - node -> + insertRowNode -> DeviceIDFactory.getInstance() - .getDeviceID(node.getTargetPath()) + .getDeviceID(insertRowNode.getTargetPath()) .getTableName()) .collect(Collectors.toSet())) { checkTableName(tableName); @@ -443,6 +460,9 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent eventParsers = new ArrayList<>(); final InsertNode node = getInsertNode(); + if (Objects.isNull(node)) { + throw new PipeException("InsertNode has been released"); + } switch (node.getType()) { case INSERT_ROW: case INSERT_TABLET: @@ -526,11 +546,12 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent @Override public String toString() { + final InsertNode node = insertNode; return String.format( "PipeInsertNodeTabletInsertionEvent{progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s, eventParsers=%s}", progressIndex, - Objects.nonNull(insertNode) ? insertNode.isAligned() : null, - Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() : null, + Objects.nonNull(node) ? node.isAligned() : null, + Objects.nonNull(node) ? node.isGeneratedByPipe() : null, eventParsers) + " - " + super.toString(); @@ -538,11 +559,12 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent @Override public String coreReportMessage() { + final InsertNode node = insertNode; return String.format( "PipeInsertNodeTabletInsertionEvent{progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s}", progressIndex, - Objects.nonNull(insertNode) ? insertNode.isAligned() : null, - Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() : null) + Objects.nonNull(node) ? node.isAligned() : null, + Objects.nonNull(node) ? node.isGeneratedByPipe() : null) + " - " + super.coreReportMessage(); } @@ -567,12 +589,15 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent // invoked, the event will soon be released. @Override public long ramBytesUsed() { - return bytes > 0 - ? bytes - : (bytes = - INSTANCE_SIZE - + (Objects.nonNull(insertNode) ? InsertNodeMemoryEstimator.sizeOf(insertNode) : 0) - + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0)); + if (bytes > 0) { + return bytes; + } + final InsertNode node = insertNode; + bytes = + INSTANCE_SIZE + + (Objects.nonNull(node) ? InsertNodeMemoryEstimator.sizeOf(node) : 0) + + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0); + return bytes; } private static class PipeInsertNodeTabletInsertionEventResource extends PipeEventResource {
