This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new f586595471d [region migration] Fix regionRWLock NPE when migrating
region concurrently (#13217) (#13241)
f586595471d is described below
commit f586595471d34eb0c561790b9007855d3612e0ef
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Wed Aug 21 11:35:31 2024 +0800
[region migration] Fix regionRWLock NPE when migrating region concurrently
(#13217) (#13241)
(cherry picked from commit 8888377619e7b89290d6f2ac3cfe02177f691f27)
---
.../impl/DataNodeInternalRPCServiceImpl.java | 2 +-
.../execution/executor/RegionExecutionResult.java | 21 +++-
.../execution/executor/RegionReadExecutor.java | 42 ++++---
.../execution/executor/RegionWriteExecutor.java | 132 ++++++++-------------
.../scheduler/FragmentInstanceDispatcherImpl.java | 2 +-
5 files changed, 89 insertions(+), 110 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 870aed465e4..7755bb99e31 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -357,7 +357,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
resp.setAccepted(executionResult.isAccepted());
resp.setMessage(executionResult.getMessage());
- resp.setNeedRetry(executionResult.isNeedRetry());
+ resp.setNeedRetry(executionResult.isReadNeedRetry());
resp.setStatus(executionResult.getStatus());
return resp;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
index 2047bba61ac..c3d886eebcc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
@@ -28,7 +28,18 @@ public class RegionExecutionResult {
private String message;
private TSStatus status;
- private boolean needRetry;
+ private boolean readNeedRetry;
+
+ private RegionExecutionResult(boolean accepted, String message, TSStatus
status) {
+ this.accepted = accepted;
+ this.message = message;
+ this.status = status;
+ this.readNeedRetry = false;
+ }
+
+ public static RegionExecutionResult create(boolean accepted, String message,
TSStatus status) {
+ return new RegionExecutionResult(accepted, message, status);
+ }
public boolean isAccepted() {
return accepted;
@@ -54,11 +65,11 @@ public class RegionExecutionResult {
this.status = status;
}
- public boolean isNeedRetry() {
- return needRetry;
+ public boolean isReadNeedRetry() {
+ return readNeedRetry;
}
- public void setNeedRetry(boolean needRetry) {
- this.needRetry = needRetry;
+ public void setReadNeedRetry(boolean readNeedRetry) {
+ this.readNeedRetry = readNeedRetry;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index 8e9a16e9b14..468550d4e9d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -77,7 +77,6 @@ public class RegionReadExecutor {
public RegionExecutionResult execute(
ConsensusGroupId groupId, FragmentInstance fragmentInstance) {
// execute fragment instance in state machine
- RegionExecutionResult resp = new RegionExecutionResult();
try (SetThreadName threadName = new
SetThreadName(fragmentInstance.getId().getFullId())) {
DataSet readResponse;
if (groupId instanceof DataRegionId) {
@@ -87,35 +86,39 @@ public class RegionReadExecutor {
}
if (readResponse == null) {
LOGGER.error(RESPONSE_NULL_ERROR_MSG);
- resp.setAccepted(false);
- resp.setMessage(RESPONSE_NULL_ERROR_MSG);
+ return RegionExecutionResult.create(false, RESPONSE_NULL_ERROR_MSG,
null);
} else {
FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse;
- resp.setAccepted(!info.getState().isFailed());
- resp.setMessage(info.getMessage());
+ RegionExecutionResult resp =
+ RegionExecutionResult.create(!info.getState().isFailed(),
info.getMessage(), null);
info.getErrorCode()
.ifPresent(
s -> {
resp.setStatus(s);
- resp.setNeedRetry(StatusUtils.needRetryHelper(s));
+ resp.setReadNeedRetry(StatusUtils.needRetryHelper(s));
});
+ return resp;
}
- return resp;
} catch (ConsensusGroupNotExistException e) {
- LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.",
groupId, e);
- resp.setMessage(String.format(ERROR_MSG_FORMAT, e.getMessage()));
- resp.setNeedRetry(true);
- resp.setStatus(new
TSStatus(TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()));
+ LOGGER.warn("Execute FragmentInstance in ConsensusGroup {} failed.",
groupId, e);
+ RegionExecutionResult resp =
+ RegionExecutionResult.create(
+ false,
+ String.format(ERROR_MSG_FORMAT, e.getMessage()),
+ new
TSStatus(TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()));
+ resp.setReadNeedRetry(true);
return resp;
} catch (Throwable e) {
- LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.",
groupId, e);
- resp.setMessage(String.format(ERROR_MSG_FORMAT, e.getMessage()));
+ LOGGER.warn("Execute FragmentInstance in ConsensusGroup {} failed.",
groupId, e);
+ RegionExecutionResult resp =
+ RegionExecutionResult.create(
+ false, String.format(ERROR_MSG_FORMAT, e.getMessage()), null);
Throwable t = e.getCause();
if (t instanceof ReadException
|| t instanceof ReadIndexException
|| t instanceof NotLeaderException
|| t instanceof ServerNotReadyException) {
- resp.setNeedRetry(true);
+ resp.setReadNeedRetry(true);
resp.setStatus(new
TSStatus(TSStatusCode.RATIS_READ_UNAVAILABLE.getStatusCode()));
}
return resp;
@@ -126,20 +129,15 @@ public class RegionReadExecutor {
public RegionExecutionResult execute(FragmentInstance fragmentInstance) {
// execute fragment instance in state machine
try (SetThreadName threadName = new
SetThreadName(fragmentInstance.getId().getFullId())) {
- RegionExecutionResult resp = new RegionExecutionResult();
// FI with queryExecutor will be executed directly
FragmentInstanceInfo info =
fragmentInstanceManager.execDataQueryFragmentInstance(
fragmentInstance, VirtualDataRegion.getInstance());
- resp.setAccepted(!info.getState().isFailed());
- resp.setMessage(info.getMessage());
- return resp;
+ return RegionExecutionResult.create(!info.getState().isFailed(),
info.getMessage(), null);
} catch (Throwable t) {
LOGGER.error("Execute FragmentInstance in QueryExecutor failed.", t);
- RegionExecutionResult resp = new RegionExecutionResult();
- resp.setAccepted(false);
- resp.setMessage(String.format(ERROR_MSG_FORMAT, t.getMessage()));
- return resp;
+ return RegionExecutionResult.create(
+ false, String.format(ERROR_MSG_FORMAT, t.getMessage()), null);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index ee4ab738f80..fd0411994ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -147,11 +147,10 @@ public class RegionWriteExecutor {
return planNode.accept(executionVisitor, context);
} catch (Throwable e) {
LOGGER.warn(e.getMessage(), e);
- RegionExecutionResult result = new RegionExecutionResult();
- result.setAccepted(false);
- result.setMessage(e.getMessage());
- result.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage()));
- return result;
+ return RegionExecutionResult.create(
+ false,
+ e.getMessage(),
+ RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage()));
}
}
@@ -160,31 +159,29 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitPlan(PlanNode node,
WritePlanNodeExecutionContext context) {
- RegionExecutionResult response = new RegionExecutionResult();
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
- response.setAccepted(false);
- response.setMessage("Fail to do non-query operations because system is
read-only.");
- response.setStatus(
+ return RegionExecutionResult.create(
+ false,
+ "Fail to do non-query operations because system is read-only.",
RpcUtils.getStatus(
TSStatusCode.SYSTEM_READ_ONLY,
"Fail to do non-query operations because system is
read-only."));
- return response;
}
try {
TSStatus status =
executePlanNodeInConsensusLayer(context.getRegionId(), node);
- response.setAccepted(TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
status.getCode());
- response.setMessage(status.getMessage());
- response.setStatus(status);
+ return RegionExecutionResult.create(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode(),
+ status.getMessage(),
+ status);
} catch (ConsensusException e) {
LOGGER.warn("Failed in the write API executing the consensus layer due
to: ", e);
- response.setAccepted(false);
- response.setMessage(e.toString());
- response.setStatus(
+ return RegionExecutionResult.create(
+ false,
+ e.toString(),
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage()));
}
- return response;
}
private TSStatus executePlanNodeInConsensusLayer(ConsensusGroupId groupId,
PlanNode planNode)
@@ -234,22 +231,25 @@ public class RegionWriteExecutor {
private RegionExecutionResult executeDataInsert(
InsertNode insertNode, WritePlanNodeExecutionContext context) {
- RegionExecutionResult response = new RegionExecutionResult();
+ if (context.getRegionWriteValidationRWLock() == null) {
+ String message = "Failed to get the lock of the region because the
region is not existed.";
+ return RegionExecutionResult.create(
+ false, message,
RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, message));
+ }
+
context.getRegionWriteValidationRWLock().readLock().lock();
try {
TSStatus status = fireTriggerAndInsert(context.getRegionId(),
insertNode);
- response.setAccepted(TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
status.getCode());
- response.setMessage(status.message);
- if (!response.isAccepted()) {
- response.setStatus(status);
- }
- return response;
+ return RegionExecutionResult.create(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode(),
+ status.message,
+ status);
} catch (ConsensusException e) {
LOGGER.warn("Failed in the write API executing the consensus layer due
to: ", e);
- response.setAccepted(false);
- response.setMessage(e.toString());
-
response.setStatus(RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR,
e.toString()));
- return response;
+ return RegionExecutionResult.create(
+ false,
+ e.toString(),
+ RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR,
e.toString()));
} finally {
context.getRegionWriteValidationRWLock().readLock().unlock();
}
@@ -347,13 +347,11 @@ public class RegionWriteExecutor {
} else {
MetadataException metadataException = failingMeasurementMap.get(0);
LOGGER.warn(METADATA_ERROR_MSG, metadataException);
- result = new RegionExecutionResult();
- result.setAccepted(false);
- result.setMessage(metadataException.getMessage());
- result.setStatus(
+ return RegionExecutionResult.create(
+ false,
+ metadataException.getMessage(),
RpcUtils.getStatus(
metadataException.getErrorCode(),
metadataException.getMessage()));
- return result;
}
} finally {
context.getRegionWriteValidationRWLock().writeLock().unlock();
@@ -396,13 +394,11 @@ public class RegionWriteExecutor {
} else {
MetadataException metadataException =
failingMeasurementMap.values().iterator().next();
LOGGER.warn(METADATA_ERROR_MSG, metadataException);
- result = new RegionExecutionResult();
- result.setAccepted(false);
- result.setMessage(metadataException.getMessage());
- result.setStatus(
+ return RegionExecutionResult.create(
+ false,
+ metadataException.getMessage(),
RpcUtils.getStatus(
metadataException.getErrorCode(),
metadataException.getMessage()));
- return result;
}
} finally {
context.getRegionWriteValidationRWLock().writeLock().unlock();
@@ -459,11 +455,7 @@ public class RegionWriteExecutor {
}
TSStatus status = RpcUtils.getStatus(failingStatus);
- failingResult = new RegionExecutionResult();
- failingResult.setAccepted(false);
- failingResult.setMessage(status.getMessage());
- failingResult.setStatus(status);
- return failingResult;
+ return RegionExecutionResult.create(false, status.getMessage(),
status);
} finally {
context.getRegionWriteValidationRWLock().writeLock().unlock();
}
@@ -698,11 +690,8 @@ public class RegionWriteExecutor {
try {
schemaRegion.checkSchemaQuota(path, size);
} catch (SchemaQuotaExceededException e) {
- RegionExecutionResult result = new RegionExecutionResult();
- result.setAccepted(false);
- result.setMessage(e.getMessage());
- result.setStatus(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
- return result;
+ return RegionExecutionResult.create(
+ false, e.getMessage(), RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
}
return null;
}
@@ -716,22 +705,17 @@ public class RegionWriteExecutor {
separateMeasurementAlreadyExistException(
failingStatus, executionStatus, alreadyExistingStatus);
- RegionExecutionResult result = new RegionExecutionResult();
+ boolean isAccepted = true;
TSStatus status;
if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
status = RpcUtils.SUCCESS_STATUS;
- result.setAccepted(true);
} else if (failingStatus.isEmpty()) {
status = RpcUtils.getStatus(alreadyExistingStatus);
- result.setAccepted(true);
} else {
status = RpcUtils.getStatus(failingStatus);
- result.setAccepted(false);
+ isAccepted = false;
}
-
- result.setMessage(status.getMessage());
- result.setStatus(status);
- return result;
+ return RegionExecutionResult.create(isAccepted, status.getMessage(),
status);
}
private void separateMeasurementAlreadyExistException(
@@ -784,11 +768,8 @@ public class RegionWriteExecutor {
? super.visitPipeEnrichedWritePlanNode(new
PipeEnrichedWritePlanNode(node), context)
: super.visitAlterTimeSeries(node, context);
} catch (MetadataException e) {
- RegionExecutionResult result = new RegionExecutionResult();
- result.setAccepted(true);
- result.setMessage(e.getMessage());
- result.setStatus(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
- return result;
+ return RegionExecutionResult.create(
+ true, e.getMessage(), RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
}
}
@@ -810,15 +791,12 @@ public class RegionWriteExecutor {
if (templateSetInfo == null) {
// The activation has already been validated during analyzing.
// That means the template is being unset during the activation plan
transport.
- RegionExecutionResult result = new RegionExecutionResult();
- result.setAccepted(false);
String message =
String.format(
"Template is being unsetting from path %s. Please try
activating later.",
node.getPathSetTemplate());
- result.setMessage(message);
- result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR,
message));
- return result;
+ return RegionExecutionResult.create(
+ false, message, RpcUtils.getStatus(TSStatusCode.METADATA_ERROR,
message));
}
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion((SchemaRegionId)
context.getRegionId());
@@ -858,15 +836,12 @@ public class RegionWriteExecutor {
if (templateSetInfo == null) {
// The activation has already been validated during analyzing.
// That means the template is being unset during the activation
plan transport.
- RegionExecutionResult result = new RegionExecutionResult();
- result.setAccepted(false);
String message =
String.format(
"Template is being unsetting from path %s. Please try
activating later.",
node.getPathSetTemplate(devicePath));
- result.setMessage(message);
- result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR,
message));
- return result;
+ return RegionExecutionResult.create(
+ false, message,
RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, message));
}
RegionExecutionResult result =
checkQuotaBeforeCreatingTimeSeries(
@@ -906,17 +881,14 @@ public class RegionWriteExecutor {
if (templateSetInfo == null) {
// The activation has already been validated during analyzing.
// That means the template is being unset during the activation
plan transport.
- RegionExecutionResult result = new RegionExecutionResult();
- result.setAccepted(false);
String message =
String.format(
"Template is being unsetting from prefix path of %s.
Please try activating later.",
new PartialPath(
Arrays.copyOf(entry.getKey().getNodes(),
entry.getValue().right + 1))
.getFullPath());
- result.setMessage(message);
- result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR,
message));
- return result;
+ return RegionExecutionResult.create(
+ false, message,
RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, message));
}
RegionExecutionResult result =
checkQuotaBeforeCreatingTimeSeries(
@@ -968,13 +940,11 @@ public class RegionWriteExecutor {
if (!failingMetadataException.isEmpty()) {
MetadataException metadataException =
failingMetadataException.get(0);
LOGGER.warn(METADATA_ERROR_MSG, metadataException);
- RegionExecutionResult result = new RegionExecutionResult();
- result.setAccepted(false);
- result.setMessage(metadataException.getMessage());
- result.setStatus(
+ return RegionExecutionResult.create(
+ false,
+ metadataException.getMessage(),
RpcUtils.getStatus(
metadataException.getErrorCode(),
metadataException.getMessage()));
- return result;
}
// step 2. make sure all source paths exist.
return receivedFromPipe
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 8c4e79f5d44..7b4a4e30490 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -455,7 +455,7 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
: readExecutor.execute(groupId, instance);
if (!readResult.isAccepted()) {
LOGGER.warn(readResult.getMessage());
- if (readResult.isNeedRetry()) {
+ if (readResult.isReadNeedRetry()) {
if (readResult.getStatus() != null
&& readResult.getStatus().getCode()
==
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) {