This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 64b91af2d8e [To rel/1.2] Fix error msg of altering not existing view
(#10252)
64b91af2d8e is described below
commit 64b91af2d8e4207bfb5647d5fe7403b6689609f9
Author: Marcos_Zyk <[email protected]>
AuthorDate: Wed Jun 21 14:01:33 2023 +0800
[To rel/1.2] Fix error msg of altering not existing view (#10252)
---
.../async/handlers/rpc/SchemaUpdateRPCHandler.java | 6 +--
.../iotdb/confignode/manager/ProcedureManager.java | 11 ++++-
.../impl/schema/AlterLogicalViewProcedure.java | 47 ++++++++++++++++++----
.../metadata/visitor/SchemaExecutionVisitor.java | 2 +-
.../config/executor/ClusterConfigTaskExecutor.java | 8 +++-
5 files changed, 60 insertions(+), 14 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
index 9043a653cdf..3980e6e0c1b 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
@@ -54,10 +54,10 @@ public class SchemaUpdateRPCHandler extends
AsyncTSStatusRPCHandler {
LOGGER.info("Successfully {} on DataNode: {}", requestType,
formattedTargetLocation);
} else if (tsStatus.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
dataNodeLocationMap.remove(requestId);
- LOGGER.error(
+ LOGGER.warn(
"Failed to {} on DataNode {}, {}", requestType,
formattedTargetLocation, tsStatus);
} else {
- LOGGER.error(
+ LOGGER.warn(
"Failed to {} on DataNode {}, {}", requestType,
formattedTargetLocation, tsStatus);
}
countDownLatch.countDown();
@@ -73,7 +73,7 @@ public class SchemaUpdateRPCHandler extends
AsyncTSStatusRPCHandler {
+ targetDataNode.getInternalEndPoint()
+ "}"
+ e.getMessage();
- LOGGER.error(errorMsg);
+ LOGGER.warn(errorMsg);
countDownLatch.countDown();
responseMap.put(
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index a0cf05716a1..74a1f26fb13 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -88,6 +88,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
+import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -100,6 +101,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -906,7 +908,14 @@ public class ProcedureManager {
} else {
if (finishedProcedure.getException().getCause() instanceof
IoTDBException) {
IoTDBException e = (IoTDBException)
finishedProcedure.getException().getCause();
- statusList.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ if (e instanceof BatchProcessException) {
+ statusList.add(
+ RpcUtils.getStatus(
+ Arrays.stream(((BatchProcessException)
e).getFailingStatus())
+ .collect(Collectors.toList())));
+ } else {
+ statusList.add(RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
+ }
} else {
statusList.add(
StatusUtils.EXECUTE_STATEMENT_ERROR.setMessage(
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
index c1802db6214..09047b5dcbd 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
@@ -39,6 +40,7 @@ import
org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import
org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
import
org.apache.iotdb.confignode.procedure.state.schema.AlterLogicalViewState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.metadata.view.ViewNotExistException;
import org.apache.iotdb.mpp.rpc.thrift.TAlterViewReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
@@ -167,7 +169,7 @@ public class AlterLogicalViewProcedure
ViewExpression.serialize(viewEntry.getValue(), stream);
}
} catch (IOException e) {
- throw new RuntimeException(e);
+ // won't reach here
}
viewMapBinaryList.add(ByteBuffer.wrap(stream.toByteArray()));
}
@@ -175,6 +177,9 @@ public class AlterLogicalViewProcedure
return req;
});
regionTaskExecutor.execute();
+ if (isFailed()) {
+ return;
+ }
invalidateCache(env);
}
@@ -241,12 +246,11 @@ public class AlterLogicalViewProcedure
try {
patternTree.serialize(dataOutputStream);
} catch (IOException ignored) {
-
+ // won't reach here
}
- ByteBuffer patternTreeBytes =
ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
this.pathPatternTree = patternTree;
- this.patternTreeBytes = patternTreeBytes;
+ this.patternTreeBytes =
ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
}
@Override
@@ -298,6 +302,8 @@ public class AlterLogicalViewProcedure
private final String taskName;
+ private final List<TSStatus> failureStatusList = new ArrayList<>();
+
AlterLogicalViewRegionTaskExecutor(
String taskName,
ConfigNodeProcedureEnv env,
@@ -319,10 +325,17 @@ public class AlterLogicalViewProcedure
}
if (response.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- List<TSStatus> subStatus = response.getSubStatus();
- for (int i = 0; i < subStatus.size(); i++) {
- if (subStatus.get(i).getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- failedRegionList.add(consensusGroupIdList.get(i));
+ List<TSStatus> subStatusList = response.getSubStatus();
+ TSStatus subStatus;
+ for (int i = 0; i < subStatusList.size(); i++) {
+ subStatus = subStatusList.get(i);
+ if (subStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ if (subStatus.getCode() ==
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
+ failedRegionList.add(consensusGroupIdList.get(i));
+ } else {
+ collectFailure(subStatus);
+ interruptTask();
+ }
}
}
} else {
@@ -331,6 +344,24 @@ public class AlterLogicalViewProcedure
return failedRegionList;
}
+ private void collectFailure(TSStatus failureStatus) {
+ if (failureStatus.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ failureStatusList.addAll(failureStatus.getSubStatus());
+ } else {
+ failureStatusList.add(failureStatus);
+ }
+ if (failureStatusList.size() == 1) {
+ setFailure(
+ new ProcedureException(
+ new IoTDBException(
+ failureStatusList.get(0).getMessage(),
failureStatusList.get(0).getCode())));
+ } else {
+ setFailure(
+ new ProcedureException(
+ new BatchProcessException(failureStatusList.toArray(new
TSStatus[0]))));
+ }
+ }
+
@Override
protected void onAllReplicasetFailure(
TConsensusGroupId consensusGroupId, Set<TDataNodeLocation>
dataNodeLocationSet) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
index 515001e1af9..33c9a804a3f 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
@@ -475,7 +475,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
schemaRegion.alterLogicalView(
SchemaRegionWritePlanFactory.getAlterLogicalViewPlan(entry.getKey(),
entry.getValue()));
} catch (MetadataException e) {
- logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ logger.warn("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 2cd03420959..a8ad1d92e15 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -102,6 +102,7 @@ import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -1837,7 +1838,12 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
"Failed to execute alter view {}, status is {}.",
alterLogicalViewStatement.getTargetPathList(),
tsStatus);
- future.setException(new IoTDBException(tsStatus.getMessage(),
tsStatus.getCode()));
+ if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
{
+ future.setException(
+ new BatchProcessException(tsStatus.subStatus.toArray(new
TSStatus[0])));
+ } else {
+ future.setException(new IoTDBException(tsStatus.getMessage(),
tsStatus.getCode()));
+ }
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}