This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8888377619e [region migration] Fix regionRWLock NPE when migrating 
region concurrently (#13217)
8888377619e is described below

commit 8888377619e7b89290d6f2ac3cfe02177f691f27
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Tue Aug 20 19:16:47 2024 +0800

    [region migration] Fix regionRWLock NPE when migrating region concurrently 
(#13217)
---
 .../impl/DataNodeInternalRPCServiceImpl.java       |   2 +-
 .../execution/executor/RegionExecutionResult.java  |  21 +++-
 .../execution/executor/RegionReadExecutor.java     |  42 ++++---
 .../execution/executor/RegionWriteExecutor.java    | 132 ++++++++-------------
 .../scheduler/FragmentInstanceDispatcherImpl.java  |   2 +-
 5 files changed, 89 insertions(+), 110 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 9a2201075d2..36091acbe24 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -365,7 +365,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
     resp.setAccepted(executionResult.isAccepted());
     resp.setMessage(executionResult.getMessage());
-    resp.setNeedRetry(executionResult.isNeedRetry());
+    resp.setNeedRetry(executionResult.isReadNeedRetry());
     resp.setStatus(executionResult.getStatus());
     return resp;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
index 2047bba61ac..c3d886eebcc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
@@ -28,7 +28,18 @@ public class RegionExecutionResult {
   private String message;
 
   private TSStatus status;
-  private boolean needRetry;
+  private boolean readNeedRetry;
+
+  private RegionExecutionResult(boolean accepted, String message, TSStatus 
status) {
+    this.accepted = accepted;
+    this.message = message;
+    this.status = status;
+    this.readNeedRetry = false;
+  }
+
+  public static RegionExecutionResult create(boolean accepted, String message, 
TSStatus status) {
+    return new RegionExecutionResult(accepted, message, status);
+  }
 
   public boolean isAccepted() {
     return accepted;
@@ -54,11 +65,11 @@ public class RegionExecutionResult {
     this.status = status;
   }
 
-  public boolean isNeedRetry() {
-    return needRetry;
+  public boolean isReadNeedRetry() {
+    return readNeedRetry;
   }
 
-  public void setNeedRetry(boolean needRetry) {
-    this.needRetry = needRetry;
+  public void setReadNeedRetry(boolean readNeedRetry) {
+    this.readNeedRetry = readNeedRetry;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index 8e9a16e9b14..468550d4e9d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -77,7 +77,6 @@ public class RegionReadExecutor {
   public RegionExecutionResult execute(
       ConsensusGroupId groupId, FragmentInstance fragmentInstance) {
     // execute fragment instance in state machine
-    RegionExecutionResult resp = new RegionExecutionResult();
     try (SetThreadName threadName = new 
SetThreadName(fragmentInstance.getId().getFullId())) {
       DataSet readResponse;
       if (groupId instanceof DataRegionId) {
@@ -87,35 +86,39 @@ public class RegionReadExecutor {
       }
       if (readResponse == null) {
         LOGGER.error(RESPONSE_NULL_ERROR_MSG);
-        resp.setAccepted(false);
-        resp.setMessage(RESPONSE_NULL_ERROR_MSG);
+        return RegionExecutionResult.create(false, RESPONSE_NULL_ERROR_MSG, 
null);
       } else {
         FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse;
-        resp.setAccepted(!info.getState().isFailed());
-        resp.setMessage(info.getMessage());
+        RegionExecutionResult resp =
+            RegionExecutionResult.create(!info.getState().isFailed(), 
info.getMessage(), null);
         info.getErrorCode()
             .ifPresent(
                 s -> {
                   resp.setStatus(s);
-                  resp.setNeedRetry(StatusUtils.needRetryHelper(s));
+                  resp.setReadNeedRetry(StatusUtils.needRetryHelper(s));
                 });
+        return resp;
       }
-      return resp;
     } catch (ConsensusGroupNotExistException e) {
-      LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.", 
groupId, e);
-      resp.setMessage(String.format(ERROR_MSG_FORMAT, e.getMessage()));
-      resp.setNeedRetry(true);
-      resp.setStatus(new 
TSStatus(TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()));
+      LOGGER.warn("Execute FragmentInstance in ConsensusGroup {} failed.", 
groupId, e);
+      RegionExecutionResult resp =
+          RegionExecutionResult.create(
+              false,
+              String.format(ERROR_MSG_FORMAT, e.getMessage()),
+              new 
TSStatus(TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()));
+      resp.setReadNeedRetry(true);
       return resp;
     } catch (Throwable e) {
-      LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.", 
groupId, e);
-      resp.setMessage(String.format(ERROR_MSG_FORMAT, e.getMessage()));
+      LOGGER.warn("Execute FragmentInstance in ConsensusGroup {} failed.", 
groupId, e);
+      RegionExecutionResult resp =
+          RegionExecutionResult.create(
+              false, String.format(ERROR_MSG_FORMAT, e.getMessage()), null);
       Throwable t = e.getCause();
       if (t instanceof ReadException
           || t instanceof ReadIndexException
           || t instanceof NotLeaderException
           || t instanceof ServerNotReadyException) {
-        resp.setNeedRetry(true);
+        resp.setReadNeedRetry(true);
         resp.setStatus(new 
TSStatus(TSStatusCode.RATIS_READ_UNAVAILABLE.getStatusCode()));
       }
       return resp;
@@ -126,20 +129,15 @@ public class RegionReadExecutor {
   public RegionExecutionResult execute(FragmentInstance fragmentInstance) {
     // execute fragment instance in state machine
     try (SetThreadName threadName = new 
SetThreadName(fragmentInstance.getId().getFullId())) {
-      RegionExecutionResult resp = new RegionExecutionResult();
       // FI with queryExecutor will be executed directly
       FragmentInstanceInfo info =
           fragmentInstanceManager.execDataQueryFragmentInstance(
               fragmentInstance, VirtualDataRegion.getInstance());
-      resp.setAccepted(!info.getState().isFailed());
-      resp.setMessage(info.getMessage());
-      return resp;
+      return RegionExecutionResult.create(!info.getState().isFailed(), 
info.getMessage(), null);
     } catch (Throwable t) {
       LOGGER.error("Execute FragmentInstance in QueryExecutor failed.", t);
-      RegionExecutionResult resp = new RegionExecutionResult();
-      resp.setAccepted(false);
-      resp.setMessage(String.format(ERROR_MSG_FORMAT, t.getMessage()));
-      return resp;
+      return RegionExecutionResult.create(
+          false, String.format(ERROR_MSG_FORMAT, t.getMessage()), null);
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index ad3dfd64cc8..76db013a87b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -148,11 +148,10 @@ public class RegionWriteExecutor {
       return planNode.accept(executionVisitor, context);
     } catch (Throwable e) {
       LOGGER.warn(e.getMessage(), e);
-      RegionExecutionResult result = new RegionExecutionResult();
-      result.setAccepted(false);
-      result.setMessage(e.getMessage());
-      result.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage()));
-      return result;
+      return RegionExecutionResult.create(
+          false,
+          e.getMessage(),
+          RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage()));
     }
   }
 
@@ -161,31 +160,29 @@ public class RegionWriteExecutor {
 
     @Override
     public RegionExecutionResult visitPlan(PlanNode node, 
WritePlanNodeExecutionContext context) {
-      RegionExecutionResult response = new RegionExecutionResult();
 
       if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
-        response.setAccepted(false);
-        response.setMessage("Fail to do non-query operations because system is 
read-only.");
-        response.setStatus(
+        return RegionExecutionResult.create(
+            false,
+            "Fail to do non-query operations because system is read-only.",
             RpcUtils.getStatus(
                 TSStatusCode.SYSTEM_READ_ONLY,
                 "Fail to do non-query operations because system is 
read-only."));
-        return response;
       }
 
       try {
         TSStatus status = 
executePlanNodeInConsensusLayer(context.getRegionId(), node);
-        response.setAccepted(TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
status.getCode());
-        response.setMessage(status.getMessage());
-        response.setStatus(status);
+        return RegionExecutionResult.create(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode(),
+            status.getMessage(),
+            status);
       } catch (ConsensusException e) {
         LOGGER.warn("Failed in the write API executing the consensus layer due 
to: ", e);
-        response.setAccepted(false);
-        response.setMessage(e.toString());
-        response.setStatus(
+        return RegionExecutionResult.create(
+            false,
+            e.toString(),
             RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage()));
       }
-      return response;
     }
 
     private TSStatus executePlanNodeInConsensusLayer(ConsensusGroupId groupId, 
PlanNode planNode)
@@ -241,22 +238,25 @@ public class RegionWriteExecutor {
 
     private RegionExecutionResult executeDataInsert(
         InsertNode insertNode, WritePlanNodeExecutionContext context) {
-      RegionExecutionResult response = new RegionExecutionResult();
+      if (context.getRegionWriteValidationRWLock() == null) {
+        String message = "Failed to get the lock of the region because the 
region is not existed.";
+        return RegionExecutionResult.create(
+            false, message, 
RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, message));
+      }
+
       context.getRegionWriteValidationRWLock().readLock().lock();
       try {
         TSStatus status = fireTriggerAndInsert(context.getRegionId(), 
insertNode);
-        response.setAccepted(TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
status.getCode());
-        response.setMessage(status.message);
-        if (!response.isAccepted()) {
-          response.setStatus(status);
-        }
-        return response;
+        return RegionExecutionResult.create(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode(),
+            status.message,
+            status);
       } catch (ConsensusException e) {
         LOGGER.warn("Failed in the write API executing the consensus layer due 
to: ", e);
-        response.setAccepted(false);
-        response.setMessage(e.toString());
-        
response.setStatus(RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, 
e.toString()));
-        return response;
+        return RegionExecutionResult.create(
+            false,
+            e.toString(),
+            RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, 
e.toString()));
       } finally {
         context.getRegionWriteValidationRWLock().readLock().unlock();
       }
@@ -354,13 +354,11 @@ public class RegionWriteExecutor {
           } else {
             MetadataException metadataException = failingMeasurementMap.get(0);
             LOGGER.warn(METADATA_ERROR_MSG, metadataException);
-            result = new RegionExecutionResult();
-            result.setAccepted(false);
-            result.setMessage(metadataException.getMessage());
-            result.setStatus(
+            return RegionExecutionResult.create(
+                false,
+                metadataException.getMessage(),
                 RpcUtils.getStatus(
                     metadataException.getErrorCode(), 
metadataException.getMessage()));
-            return result;
           }
         } finally {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
@@ -403,13 +401,11 @@ public class RegionWriteExecutor {
           } else {
             MetadataException metadataException = 
failingMeasurementMap.values().iterator().next();
             LOGGER.warn(METADATA_ERROR_MSG, metadataException);
-            result = new RegionExecutionResult();
-            result.setAccepted(false);
-            result.setMessage(metadataException.getMessage());
-            result.setStatus(
+            return RegionExecutionResult.create(
+                false,
+                metadataException.getMessage(),
                 RpcUtils.getStatus(
                     metadataException.getErrorCode(), 
metadataException.getMessage()));
-            return result;
           }
         } finally {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
@@ -466,11 +462,7 @@ public class RegionWriteExecutor {
           }
 
           TSStatus status = RpcUtils.getStatus(failingStatus);
-          failingResult = new RegionExecutionResult();
-          failingResult.setAccepted(false);
-          failingResult.setMessage(status.getMessage());
-          failingResult.setStatus(status);
-          return failingResult;
+          return RegionExecutionResult.create(false, status.getMessage(), 
status);
         } finally {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
         }
@@ -705,11 +697,8 @@ public class RegionWriteExecutor {
       try {
         schemaRegion.checkSchemaQuota(path, size);
       } catch (SchemaQuotaExceededException e) {
-        RegionExecutionResult result = new RegionExecutionResult();
-        result.setAccepted(false);
-        result.setMessage(e.getMessage());
-        result.setStatus(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
-        return result;
+        return RegionExecutionResult.create(
+            false, e.getMessage(), RpcUtils.getStatus(e.getErrorCode(), 
e.getMessage()));
       }
       return null;
     }
@@ -723,22 +712,17 @@ public class RegionWriteExecutor {
       separateMeasurementAlreadyExistException(
           failingStatus, executionStatus, alreadyExistingStatus);
 
-      RegionExecutionResult result = new RegionExecutionResult();
+      boolean isAccepted = true;
       TSStatus status;
       if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
         status = RpcUtils.SUCCESS_STATUS;
-        result.setAccepted(true);
       } else if (failingStatus.isEmpty()) {
         status = RpcUtils.getStatus(alreadyExistingStatus);
-        result.setAccepted(true);
       } else {
         status = RpcUtils.getStatus(failingStatus);
-        result.setAccepted(false);
+        isAccepted = false;
       }
-
-      result.setMessage(status.getMessage());
-      result.setStatus(status);
-      return result;
+      return RegionExecutionResult.create(isAccepted, status.getMessage(), 
status);
     }
 
     private void separateMeasurementAlreadyExistException(
@@ -791,11 +775,8 @@ public class RegionWriteExecutor {
             ? super.visitPipeEnrichedWritePlanNode(new 
PipeEnrichedWritePlanNode(node), context)
             : super.visitAlterTimeSeries(node, context);
       } catch (MetadataException e) {
-        RegionExecutionResult result = new RegionExecutionResult();
-        result.setAccepted(true);
-        result.setMessage(e.getMessage());
-        result.setStatus(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
-        return result;
+        return RegionExecutionResult.create(
+            true, e.getMessage(), RpcUtils.getStatus(e.getErrorCode(), 
e.getMessage()));
       }
     }
 
@@ -817,15 +798,12 @@ public class RegionWriteExecutor {
         if (templateSetInfo == null) {
           // The activation has already been validated during analyzing.
           // That means the template is being unset during the activation plan 
transport.
-          RegionExecutionResult result = new RegionExecutionResult();
-          result.setAccepted(false);
           String message =
               String.format(
                   "Template is being unsetting from path %s. Please try 
activating later.",
                   node.getPathSetTemplate());
-          result.setMessage(message);
-          result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, 
message));
-          return result;
+          return RegionExecutionResult.create(
+              false, message, RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, 
message));
         }
         ISchemaRegion schemaRegion =
             schemaEngine.getSchemaRegion((SchemaRegionId) 
context.getRegionId());
@@ -865,15 +843,12 @@ public class RegionWriteExecutor {
           if (templateSetInfo == null) {
             // The activation has already been validated during analyzing.
             // That means the template is being unset during the activation 
plan transport.
-            RegionExecutionResult result = new RegionExecutionResult();
-            result.setAccepted(false);
             String message =
                 String.format(
                     "Template is being unsetting from path %s. Please try 
activating later.",
                     node.getPathSetTemplate(devicePath));
-            result.setMessage(message);
-            result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, 
message));
-            return result;
+            return RegionExecutionResult.create(
+                false, message, 
RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, message));
           }
           RegionExecutionResult result =
               checkQuotaBeforeCreatingTimeSeries(
@@ -913,17 +888,14 @@ public class RegionWriteExecutor {
           if (templateSetInfo == null) {
             // The activation has already been validated during analyzing.
             // That means the template is being unset during the activation 
plan transport.
-            RegionExecutionResult result = new RegionExecutionResult();
-            result.setAccepted(false);
             String message =
                 String.format(
                     "Template is being unsetting from prefix path of %s. 
Please try activating later.",
                     new PartialPath(
                             Arrays.copyOf(entry.getKey().getNodes(), 
entry.getValue().right + 1))
                         .getFullPath());
-            result.setMessage(message);
-            result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, 
message));
-            return result;
+            return RegionExecutionResult.create(
+                false, message, 
RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, message));
           }
           RegionExecutionResult result =
               checkQuotaBeforeCreatingTimeSeries(
@@ -975,13 +947,11 @@ public class RegionWriteExecutor {
           if (!failingMetadataException.isEmpty()) {
             MetadataException metadataException = 
failingMetadataException.get(0);
             LOGGER.warn(METADATA_ERROR_MSG, metadataException);
-            RegionExecutionResult result = new RegionExecutionResult();
-            result.setAccepted(false);
-            result.setMessage(metadataException.getMessage());
-            result.setStatus(
+            return RegionExecutionResult.create(
+                false,
+                metadataException.getMessage(),
                 RpcUtils.getStatus(
                     metadataException.getErrorCode(), 
metadataException.getMessage()));
-            return result;
           }
           // step 2. make sure all source paths exist.
           return receivedFromPipe
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 8c4e79f5d44..7b4a4e30490 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -455,7 +455,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
                 : readExecutor.execute(groupId, instance);
         if (!readResult.isAccepted()) {
           LOGGER.warn(readResult.getMessage());
-          if (readResult.isNeedRetry()) {
+          if (readResult.isReadNeedRetry()) {
             if (readResult.getStatus() != null
                 && readResult.getStatus().getCode()
                     == 
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) {

Reply via email to