This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 69b21ddc095 Pipe: fix NPE when updating cache leader after calling
getDeviceId() in PipeRawTabletInsertionEvent (#12612)
69b21ddc095 is described below
commit 69b21ddc0952d09e56c39051978980f0f9375f46
Author: Zikun Ma <[email protected]>
AuthorDate: Wed May 29 17:55:54 2024 +0800
Pipe: fix NPE when updating cache leader after calling getDeviceId() in
PipeRawTabletInsertionEvent (#12612)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/connector/client/IoTDBDataNodeAsyncClientManager.java | 2 +-
.../connector/client/IoTDBDataNodeCacheLeaderClientManager.java | 4 ++++
.../db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java | 2 +-
.../async/handler/PipeTransferTabletInsertNodeEventHandler.java | 6 ++----
.../protocol/thrift/sync/IoTDBDataRegionSyncConnector.java | 5 ++---
.../db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java | 9 ++++++++-
6 files changed, 18 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 62eeab93caa..8149a40add7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -253,7 +253,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
}
public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
- if (!useLeaderCache) {
+ if (!useLeaderCache || deviceId == null || endPoint == null) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
index 29e46e60635..be3cb9d4fda 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
@@ -105,6 +105,10 @@ public interface IoTDBDataNodeCacheLeaderClientManager {
}
public void updateLeaderEndPoint(String deviceId, TEndPoint endPoint) {
+ if (deviceId == null || endPoint == null) {
+ return;
+ }
+
TEndPoint endPointFromMap = endPoints.putIfAbsent(endPoint, endPoint);
if (endPointFromMap != null) {
device2endpoint.put(deviceId, endPointFromMap);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index 969d8d74d38..2c23cba4542 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -89,7 +89,7 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
}
public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
- if (!useLeaderCache) {
+ if (!useLeaderCache || deviceId == null || endPoint == null) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 660b2677ad5..43dfb6aad29 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -46,9 +46,7 @@ public class PipeTransferTabletInsertNodeEventHandler
@Override
protected void updateLeaderCache(TSStatus status) {
- if (((PipeInsertNodeTabletInsertionEvent) event).getDeviceId() != null) {
- connector.updateLeaderCache(
- ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(),
status.getRedirectNode());
- }
+ connector.updateLeaderCache(
+ ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(),
status.getRedirectNode());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index cf93bac6ced..01358f20cd3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -287,10 +287,9 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
pipeInsertNodeTabletInsertionEvent.coreReportMessage(), status),
pipeInsertNodeTabletInsertionEvent.toString());
}
- // pipeInsertNodeTabletInsertionEvent.getDeviceId() is null for
InsertRowsNode
- if (Objects.nonNull(pipeInsertNodeTabletInsertionEvent.getDeviceId())
- && status.isSetRedirectNode()) {
+ if (status.isSetRedirectNode()) {
clientManager.updateLeaderCache(
+ // pipeInsertNodeTabletInsertionEvent.getDeviceId() is null for
InsertRowsNode
pipeInsertNodeTabletInsertionEvent.getDeviceId(),
status.getRedirectNode());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index f227475e648..e792f580981 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -39,6 +39,7 @@ import java.util.function.BiConsumer;
public class PipeRawTabletInsertionEvent extends EnrichedEvent implements
TabletInsertionEvent {
private Tablet tablet;
+ private String deviceId; // Only used when the tablet is released.
private final boolean isAligned;
private final EnrichedEvent sourceEvent;
@@ -110,6 +111,11 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
@Override
public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
allocatedMemoryBlock.close();
+
+ // Record the deviceId before the memory is released,
+ // for later possibly updating the leader cache.
+ deviceId = tablet.deviceId;
+
// Actually release the occupied memory.
tablet = null;
dataContainer = null;
@@ -183,7 +189,8 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
}
public String getDeviceId() {
- return tablet.deviceId;
+ // NonNull indicates that the internallyDecreaseResourceReferenceCount has
not been called.
+ return Objects.nonNull(tablet) ? tablet.deviceId : deviceId;
}
/////////////////////////// TabletInsertionEvent ///////////////////////////