This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 69b21ddc095 Pipe: fix NPE when updating cache leader after calling 
getDeviceId() in PipeRawTabletInsertionEvent (#12612)
69b21ddc095 is described below

commit 69b21ddc0952d09e56c39051978980f0f9375f46
Author: Zikun Ma <[email protected]>
AuthorDate: Wed May 29 17:55:54 2024 +0800

    Pipe: fix NPE when updating cache leader after calling getDeviceId() in 
PipeRawTabletInsertionEvent (#12612)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/connector/client/IoTDBDataNodeAsyncClientManager.java   | 2 +-
 .../connector/client/IoTDBDataNodeCacheLeaderClientManager.java  | 4 ++++
 .../db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java | 2 +-
 .../async/handler/PipeTransferTabletInsertNodeEventHandler.java  | 6 ++----
 .../protocol/thrift/sync/IoTDBDataRegionSyncConnector.java       | 5 ++---
 .../db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java | 9 ++++++++-
 6 files changed, 18 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 62eeab93caa..8149a40add7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -253,7 +253,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
   }
 
   public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
-    if (!useLeaderCache) {
+    if (!useLeaderCache || deviceId == null || endPoint == null) {
       return;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
index 29e46e60635..be3cb9d4fda 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
@@ -105,6 +105,10 @@ public interface IoTDBDataNodeCacheLeaderClientManager {
     }
 
     public void updateLeaderEndPoint(String deviceId, TEndPoint endPoint) {
+      if (deviceId == null || endPoint == null) {
+        return;
+      }
+
       TEndPoint endPointFromMap = endPoints.putIfAbsent(endPoint, endPoint);
       if (endPointFromMap != null) {
         device2endpoint.put(deviceId, endPointFromMap);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index 969d8d74d38..2c23cba4542 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -89,7 +89,7 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
   }
 
   public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
-    if (!useLeaderCache) {
+    if (!useLeaderCache || deviceId == null || endPoint == null) {
       return;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 660b2677ad5..43dfb6aad29 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -46,9 +46,7 @@ public class PipeTransferTabletInsertNodeEventHandler
 
   @Override
   protected void updateLeaderCache(TSStatus status) {
-    if (((PipeInsertNodeTabletInsertionEvent) event).getDeviceId() != null) {
-      connector.updateLeaderCache(
-          ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(), 
status.getRedirectNode());
-    }
+    connector.updateLeaderCache(
+        ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(), 
status.getRedirectNode());
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index cf93bac6ced..01358f20cd3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -287,10 +287,9 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
               pipeInsertNodeTabletInsertionEvent.coreReportMessage(), status),
           pipeInsertNodeTabletInsertionEvent.toString());
     }
-    // pipeInsertNodeTabletInsertionEvent.getDeviceId() is null for 
InsertRowsNode
-    if (Objects.nonNull(pipeInsertNodeTabletInsertionEvent.getDeviceId())
-        && status.isSetRedirectNode()) {
+    if (status.isSetRedirectNode()) {
       clientManager.updateLeaderCache(
+          // pipeInsertNodeTabletInsertionEvent.getDeviceId() is null for 
InsertRowsNode
           pipeInsertNodeTabletInsertionEvent.getDeviceId(), 
status.getRedirectNode());
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index f227475e648..e792f580981 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -39,6 +39,7 @@ import java.util.function.BiConsumer;
 public class PipeRawTabletInsertionEvent extends EnrichedEvent implements 
TabletInsertionEvent {
 
   private Tablet tablet;
+  private String deviceId; // Only used when the tablet is released.
   private final boolean isAligned;
 
   private final EnrichedEvent sourceEvent;
@@ -110,6 +111,11 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   @Override
   public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
     allocatedMemoryBlock.close();
+
+    // Record the deviceId before the memory is released,
+    // for later possibly updating the leader cache.
+    deviceId = tablet.deviceId;
+
     // Actually release the occupied memory.
     tablet = null;
     dataContainer = null;
@@ -183,7 +189,8 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   }
 
   public String getDeviceId() {
-    return tablet.deviceId;
+    // NonNull indicates that the internallyDecreaseResourceReferenceCount has 
not been called.
+    return Objects.nonNull(tablet) ? tablet.deviceId : deviceId;
   }
 
   /////////////////////////// TabletInsertionEvent ///////////////////////////

Reply via email to