This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch err-cli-opti-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/err-cli-opti-13 by this push:
new 9580cab5aa5 Optimized the error log for schema execution (#16982)
9580cab5aa5 is described below
commit 9580cab5aa55a25b4c6a697e667dd6647902a1b8
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jan 6 16:32:34 2026 +0800
Optimized the error log for schema execution (#16982)
---
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 12 +
.../impl/schema/AlterLogicalViewProcedure.java | 10 +-
.../impl/schema/DataNodeRegionTaskExecutor.java | 19 +-
.../impl/schema/DeactivateTemplateProcedure.java | 32 +--
.../impl/schema/DeleteLogicalViewProcedure.java | 38 +--
.../impl/schema/DeleteTimeSeriesProcedure.java | 37 +--
.../procedure/impl/schema/SchemaUtils.java | 12 +-
.../impl/schema/SetTemplateProcedure.java | 12 +-
.../schema/DataNodeRegionTaskExecutorTest.java | 81 ++++++
.../schemaregion/SchemaExecutionVisitor.java | 66 +++--
.../schemaregion/SchemaExecutionVisitorTest.java | 277 +++++++++++++++++++++
11 files changed, 491 insertions(+), 105 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index e48054abb19..5670f80f247 100644
--- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -163,6 +163,18 @@ public class RpcUtils {
return status;
}
+ public static TSStatus extractFailureStatues(final TSStatus input) {
+ return input.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
+ ? new TSStatus(input.getCode())
+ .setMessage(input.getMessage())
+ .setSubStatus(
+ input.getSubStatus().stream()
+ .filter(
+ status -> status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode())
+ .collect(Collectors.toList()))
+ : input;
+ }
+
/**
* Convert from {@link TSStatusCode} to {@link TSStatus}, which has message
appended with existing
* status message
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
index a0537f14175..3e25675108e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
@@ -59,6 +59,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
+import java.util.stream.Collectors;
public class AlterLogicalViewProcedure
extends StateMachineProcedure<ConfigNodeProcedureEnv,
AlterLogicalViewState> {
@@ -72,6 +73,8 @@ public class AlterLogicalViewProcedure
private transient PathPatternTree pathPatternTree;
private transient ByteBuffer patternTreeBytes;
+ protected final Map<TDataNodeLocation, TSStatus> failureMap = new
HashMap<>();
+
public AlterLogicalViewProcedure(final boolean isGeneratedByPipe) {
super(isGeneratedByPipe);
}
@@ -390,11 +393,14 @@ public class AlterLogicalViewProcedure
new ProcedureException(
new MetadataException(
String.format(
- "Alter view %s failed when [%s] because failed to
execute in all replicaset of schemaRegion %s. Failure nodes: %s",
+ "Alter view %s failed when [%s] because failed to
execute in all replicaset of schemaRegion %s. Failure nodes: %s, statuses: %s",
viewPathToSourceMap.keySet(),
taskName,
consensusGroupId.id,
- dataNodeLocationSet))));
+ dataNodeLocationSet.stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toSet()),
+ failureStatusList))));
interruptTask();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java
index 64a68a59c49..2f657d42d0f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java
@@ -22,11 +22,13 @@ package org.apache.iotdb.confignode.procedure.impl.schema;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
import
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.rpc.TSStatusCode;
import java.util.ArrayList;
import java.util.HashMap;
@@ -35,6 +37,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
+import java.util.stream.Collectors;
import static
org.apache.iotdb.confignode.procedure.impl.schema.DataNodeRegionGroupUtil.getAllReplicaDataNodeRegionGroupMap;
import static
org.apache.iotdb.confignode.procedure.impl.schema.DataNodeRegionGroupUtil.getLeaderDataNodeRegionGroupMap;
@@ -48,7 +51,7 @@ public abstract class DataNodeRegionTaskExecutor<Q, R> {
protected final CnToDnAsyncRequestType dataNodeRequestType;
protected final BiFunction<TDataNodeLocation, List<TConsensusGroupId>, Q>
dataNodeRequestGenerator;
-
+ protected final Map<TDataNodeLocation, TSStatus> failureMap = new
HashMap<>();
private boolean isInterrupted = false;
protected DataNodeRegionTaskExecutor(
@@ -221,5 +224,17 @@ public abstract class DataNodeRegionTaskExecutor<Q, R> {
* executed.
*/
protected abstract void onAllReplicasetFailure(
- TConsensusGroupId consensusGroupId, Set<TDataNodeLocation>
dataNodeLocationSet);
+ final TConsensusGroupId consensusGroupId, final Set<TDataNodeLocation>
dataNodeLocationSet);
+
+ protected String printFailureMap() {
+ return failureMap.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ entry -> "DataNodeId: " + entry.getKey().getDataNodeId(),
+ entry ->
+ entry.getValue().getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()
+ ? entry.getValue().getSubStatus()
+ : entry.getValue()))
+ .toString();
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeactivateTemplateProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeactivateTemplateProcedure.java
index e1a94cf0934..b525bca7be4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeactivateTemplateProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeactivateTemplateProcedure.java
@@ -143,7 +143,6 @@ public class DeactivateTemplateProcedure
if (targetSchemaRegionGroup.isEmpty()) {
return 0;
}
- List<TSStatus> successResult = new ArrayList<>();
DeactivateTemplateRegionTaskExecutor<TConstructSchemaBlackListWithTemplateReq>
constructBlackListTask =
new
DeactivateTemplateRegionTaskExecutor<TConstructSchemaBlackListWithTemplateReq>(
@@ -156,26 +155,11 @@ public class DeactivateTemplateProcedure
consensusGroupIdList, dataNodeRequest))) {
@Override
protected List<TConsensusGroupId> processResponseOfOneDataNode(
- TDataNodeLocation dataNodeLocation,
- List<TConsensusGroupId> consensusGroupIdList,
- TSStatus response) {
- List<TConsensusGroupId> failedRegionList = new ArrayList<>();
- if (response.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- successResult.add(response);
- } else if (response.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- List<TSStatus> subStatusList = response.getSubStatus();
- for (int i = 0; i < subStatusList.size(); i++) {
- if (subStatusList.get(i).getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- successResult.add(subStatusList.get(i));
- } else {
- failedRegionList.add(consensusGroupIdList.get(i));
- }
- }
- } else {
- failedRegionList.addAll(consensusGroupIdList);
- }
- return failedRegionList;
+ final TDataNodeLocation dataNodeLocation,
+ final List<TConsensusGroupId> consensusGroupIdList,
+ final TSStatus response) {
+ return processResponseOfOneDataNodeWithSuccessResult(
+ dataNodeLocation, consensusGroupIdList, response);
}
};
constructBlackListTask.execute();
@@ -185,7 +169,7 @@ public class DeactivateTemplateProcedure
}
long preDeletedNum = 0;
- for (TSStatus resp : successResult) {
+ for (TSStatus resp : constructBlackListTask.getSuccessResult()) {
preDeletedNum += Long.parseLong(resp.getMessage());
}
return preDeletedNum;
@@ -489,12 +473,12 @@ public class DeactivateTemplateProcedure
new ProcedureException(
new MetadataException(
String.format(
- "Deactivate template of %s failed when [%s] because
failed to execute in all replicaset of %s %s. Failure nodes: %s",
+ "Deactivate template of %s failed when [%s] because
failed to execute in all replicaset of %s %s. Failure: %s",
requestMessage,
taskName,
consensusGroupId.type,
consensusGroupId.id,
- dataNodeLocationSet))));
+ printFailureMap()))));
interruptTask();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
index 4d92454fd22..36833c29e65 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java
@@ -53,7 +53,6 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -135,7 +134,6 @@ public class DeleteLogicalViewProcedure
if (targetSchemaRegionGroup.isEmpty()) {
return 0;
}
- List<TSStatus> successResult = new ArrayList<>();
DeleteLogicalViewRegionTaskExecutor<TConstructViewSchemaBlackListReq>
constructBlackListTask =
new
DeleteLogicalViewRegionTaskExecutor<TConstructViewSchemaBlackListReq>(
"construct view schema engine black list",
@@ -146,25 +144,11 @@ public class DeleteLogicalViewProcedure
new TConstructViewSchemaBlackListReq(consensusGroupIdList,
patternTreeBytes))) {
@Override
protected List<TConsensusGroupId> processResponseOfOneDataNode(
- TDataNodeLocation dataNodeLocation,
- List<TConsensusGroupId> consensusGroupIdList,
- TSStatus response) {
- List<TConsensusGroupId> failedRegionList = new ArrayList<>();
- if (response.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- successResult.add(response);
- } else if (response.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- List<TSStatus> subStatusList = response.getSubStatus();
- for (int i = 0; i < subStatusList.size(); i++) {
- if (subStatusList.get(i).getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- successResult.add(subStatusList.get(i));
- } else {
- failedRegionList.add(consensusGroupIdList.get(i));
- }
- }
- } else {
- failedRegionList.addAll(consensusGroupIdList);
- }
- return failedRegionList;
+ final TDataNodeLocation dataNodeLocation,
+ final List<TConsensusGroupId> consensusGroupIdList,
+ final TSStatus response) {
+ return processResponseOfOneDataNodeWithSuccessResult(
+ dataNodeLocation, consensusGroupIdList, response);
}
};
constructBlackListTask.execute();
@@ -173,11 +157,9 @@ public class DeleteLogicalViewProcedure
return 0;
}
- long preDeletedNum = 0;
- for (TSStatus resp : successResult) {
- preDeletedNum += Long.parseLong(resp.getMessage());
- }
- return preDeletedNum;
+ return constructBlackListTask.getSuccessResult().stream()
+ .mapToLong(resp -> Long.parseLong(resp.getMessage()))
+ .reduce(0L, Long::sum);
}
private void invalidateCache(ConfigNodeProcedureEnv env) {
@@ -378,8 +360,8 @@ public class DeleteLogicalViewProcedure
new ProcedureException(
new MetadataException(
String.format(
- "Delete view %s failed when [%s] because failed to
execute in all replicaset of schemaRegion %s. Failure nodes: %s",
- requestMessage, taskName, consensusGroupId.id,
dataNodeLocationSet))));
+ "Delete view %s failed when [%s] because failed to
execute in all replicaset of schemaRegion %s. Failures: %s",
+ requestMessage, taskName, consensusGroupId.id,
printFailureMap()))));
interruptTask();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
index c7ad4c88021..e23f41245cd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
@@ -55,6 +55,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -146,7 +147,6 @@ public class DeleteTimeSeriesProcedure
return 0;
}
isAllLogicalView = true;
- final List<TSStatus> successResult = new ArrayList<>();
final DeleteTimeSeriesRegionTaskExecutor<TConstructSchemaBlackListReq>
constructBlackListTask =
new DeleteTimeSeriesRegionTaskExecutor<TConstructSchemaBlackListReq>(
"construct schema engine black list",
@@ -160,37 +160,24 @@ public class DeleteTimeSeriesProcedure
final TDataNodeLocation dataNodeLocation,
final List<TConsensusGroupId> consensusGroupIdList,
final TSStatus response) {
- final List<TConsensusGroupId> failedRegionList = new ArrayList<>();
if (response.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
isAllLogicalView = false;
- successResult.add(response);
} else if (response.getCode() ==
TSStatusCode.ONLY_LOGICAL_VIEW.getStatusCode()) {
successResult.add(response);
- } else if (response.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- final List<TSStatus> subStatusList = response.getSubStatus();
- for (int i = 0; i < subStatusList.size(); i++) {
- if (subStatusList.get(i).getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- successResult.add(subStatusList.get(i));
- } else {
- failedRegionList.add(consensusGroupIdList.get(i));
- }
- }
- } else {
- failedRegionList.addAll(consensusGroupIdList);
+ return Collections.emptyList();
}
- return failedRegionList;
+ return processResponseOfOneDataNodeWithSuccessResult(
+ dataNodeLocation, consensusGroupIdList, response);
}
};
constructBlackListTask.execute();
- if (isFailed()) {
- return 0;
- }
-
- return successResult.stream()
- .mapToLong(resp -> Long.parseLong(resp.getMessage()))
- .reduce(Long::sum)
- .orElse(0L);
+ return !isFailed()
+ ? constructBlackListTask.getSuccessResult().stream()
+ .mapToLong(resp -> Long.parseLong(resp.getMessage()))
+ .reduce(Long::sum)
+ .orElse(0L)
+ : 0;
}
private void invalidateCache(final ConfigNodeProcedureEnv env) {
@@ -454,12 +441,12 @@ public class DeleteTimeSeriesProcedure
new ProcedureException(
new MetadataException(
String.format(
- "Delete time series %s failed when [%s] because failed
to execute in all replicaset of %s %s. Failure nodes: %s",
+ "Delete time series %s failed when [%s] because failed
to execute in all replicaset of %s %s. Failures: %s",
requestMessage,
taskName,
consensusGroupId.type,
consensusGroupId.id,
- dataNodeLocationSet))));
+ printFailureMap()))));
interruptTask();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SchemaUtils.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SchemaUtils.java
index 25cd4704dc6..7499a415ed5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SchemaUtils.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SchemaUtils.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import java.io.ByteArrayOutputStream;
@@ -168,6 +169,7 @@ public class SchemaUtils {
respList.add(response);
List<TConsensusGroupId> failedRegionList = new ArrayList<>();
if (response.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failureMap.remove(dataNodeLocation);
return failedRegionList;
}
@@ -181,6 +183,12 @@ public class SchemaUtils {
} else {
failedRegionList.addAll(consensusGroupIdList);
}
+ if (!failedRegionList.isEmpty()) {
+ failureMap.put(
+ dataNodeLocation,
RpcUtils.extractFailureStatues(response.getStatus()));
+ } else {
+ failureMap.remove(dataNodeLocation);
+ }
return failedRegionList;
}
@@ -190,8 +198,8 @@ public class SchemaUtils {
exception[0] =
new MetadataException(
String.format(
- "Failed to execute in all replicaset of
schemaRegion %s when checking templates on path %s. Failure nodes: %s",
- consensusGroupId.id, deleteDatabasePatternPaths,
dataNodeLocationSet));
+ "Failed to execute in all replicaset of
schemaRegion %s when checking templates on path %s. Failures: %s",
+ consensusGroupId.id, deleteDatabasePatternPaths,
printFailureMap()));
interruptTask();
}
};
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java
index d812e715a6c..a450540527d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java
@@ -51,6 +51,7 @@ import
org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUtil;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -290,6 +291,7 @@ public class SetTemplateProcedure
respList.add(response);
List<TConsensusGroupId> failedRegionList = new ArrayList<>();
if (response.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failureMap.remove(dataNodeLocation);
return failedRegionList;
}
@@ -305,6 +307,12 @@ public class SetTemplateProcedure
} else {
failedRegionList.addAll(consensusGroupIdList);
}
+ if (!failedRegionList.isEmpty()) {
+ failureMap.put(
+ dataNodeLocation,
RpcUtils.extractFailureStatues(response.getStatus()));
+ } else {
+ failureMap.remove(dataNodeLocation);
+ }
return failedRegionList;
}
@@ -316,11 +324,11 @@ public class SetTemplateProcedure
new MetadataException(
String.format(
"Set template %s to %s failed when [check time
series existence on DataNode] because "
- + "failed to check time series existence
in all replicaset of schemaRegion %s. Failure nodes: %s",
+ + "failed to check time series existence
in all replicaset of schemaRegion %s. Failures: %s",
templateName,
templateSetPath,
consensusGroupId.id,
- dataNodeLocationSet))));
+ printFailureMap()))));
interruptTask();
}
};
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutorTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutorTest.java
new file mode 100644
index 00000000000..d1286cc3587
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutorTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.schema;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+
+public class DataNodeRegionTaskExecutorTest {
+
+ @Test
+ public void testPrintFailureMap() {
+ final TestRegionTaskExecutor executor = new TestRegionTaskExecutor();
+ executor.processResponseOfOneDataNode(
+ new TDataNodeLocation().setDataNodeId(0),
+ Collections.singletonList(new
TConsensusGroupId(TConsensusGroupType.DataRegion, 3)),
+ StatusUtils.OK);
+ executor.processResponseOfOneDataNode(
+ new TDataNodeLocation().setDataNodeId(0),
+ Arrays.asList(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 2)),
+ RpcUtils.getStatus(
+ Arrays.asList(
+ StatusUtils.OK, new
TSStatus(TSStatusCode.ILLEGAL_PATH.getStatusCode()))));
+ Assert.assertEquals("{DataNodeId: 0=[TSStatus(code:509)]}",
executor.printFailureMap());
+
+ executor.processResponseOfOneDataNodeWithSuccessResult(
+ new TDataNodeLocation().setDataNodeId(0),
+ Arrays.asList(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 2)),
+ RpcUtils.getStatus(
+ Arrays.asList(
+ StatusUtils.OK, new
TSStatus(TSStatusCode.ILLEGAL_PATH.getStatusCode()))));
+ Assert.assertTrue(StatusUtils.OK == executor.getSuccessResult().get(0));
+ }
+
+ private static class TestRegionTaskExecutor extends
DataNodeTSStatusTaskExecutor<Void> {
+
+ private TestRegionTaskExecutor() {
+ super(new ConfigNodeProcedureEnv(null, null), null, false, null, null);
+ }
+
+ @Override
+ protected void onAllReplicasetFailure(
+ final TConsensusGroupId consensusGroupId,
+ final Set<TDataNodeLocation> dataNodeLocationSet) {
+ interruptTask();
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index b392ac0f479..8438ddffc6e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
import
org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -75,6 +76,8 @@ import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -83,7 +86,7 @@ import java.util.Objects;
/** Schema write {@link PlanNode} visitor */
public class SchemaExecutionVisitor extends PlanVisitor<TSStatus,
ISchemaRegion> {
- private static final Logger logger =
LoggerFactory.getLogger(SchemaExecutionVisitor.class);
+ private static Logger logger =
LoggerFactory.getLogger(SchemaExecutionVisitor.class);
@Override
public TSStatus visitCreateTimeSeries(
@@ -91,7 +94,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
try {
schemaRegion.createTimeSeries(node, -1);
} catch (final MetadataException e) {
- logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ logMetaDataException(String.format("%s: MetaData error: ",
IoTDBConstant.GLOBAL_DB_NAME), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute
successfully");
@@ -118,7 +121,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
schemaRegion.createAlignedTimeSeries(node);
}
} catch (final MetadataException e) {
- logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ logMetaDataException(String.format("%s: MetaData error: ",
IoTDBConstant.GLOBAL_DB_NAME), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute
successfully");
@@ -144,7 +147,8 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
((CreateTimeSeriesPlanImpl)
createTimeSeriesPlan).setWithMerge(node.isGeneratedByPipe());
schemaRegion.createTimeSeries(createTimeSeriesPlan, -1);
} catch (final MetadataException e) {
- logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME,
e);
+ logMetaDataException(
+ String.format("%s: MetaData error: ",
IoTDBConstant.GLOBAL_DB_NAME), e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
}
}
@@ -286,7 +290,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
RpcUtils.getStatus(
e.getErrorCode(),
PartialPath.transformDataToString(e.getMeasurementPath())));
} catch (final MetadataException e) {
- logger.warn("{}: MetaData error: ", e.getMessage(), e);
+ logMetaDataException(String.format("%s: MetaData error: ",
e.getMessage()), e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
}
}
@@ -421,7 +425,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
break;
}
} catch (MetadataException e) {
- logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ logMetaDataException(String.format("%s: MetaData error: ",
IoTDBConstant.GLOBAL_DB_NAME), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
} catch (IOException e) {
logger.error("{}: IO error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
@@ -439,7 +443,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
schemaRegion.activateSchemaTemplate(node, template);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (final MetadataException e) {
- logger.error(e.getMessage(), e);
+ logMetaDataException(e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@@ -462,7 +466,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
if (e.getErrorCode() ==
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
alreadyActivatedDeviceList.add(entry.getKey());
} else {
- logger.error(e.getMessage(), e);
+ logMetaDataException(e);
statusList.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
}
@@ -470,7 +474,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
if (!alreadyActivatedDeviceList.isEmpty()) {
final TemplateIsInUseException e =
new TemplateIsInUseException(alreadyActivatedDeviceList.toString());
- logger.error(e.getMessage(), e);
+ logMetaDataException(e);
statusList.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
return statusList.isEmpty() ? RpcUtils.SUCCESS_STATUS :
RpcUtils.getStatus(statusList);
@@ -494,7 +498,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
"Device Template has already been activated on path %s,
there's no need to activate again.",
entry.getKey()));
} catch (final MetadataException e) {
- logger.error(e.getMessage(), e);
+ logMetaDataException(e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@@ -513,7 +517,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
: TSStatusCode.SUCCESS_STATUS,
String.valueOf(preDeletedNumAndIsAllLogicalView.getLeft()));
} catch (final MetadataException e) {
- logger.error(e.getMessage(), e);
+ logMetaDataException(e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@@ -525,7 +529,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
schemaRegion.rollbackSchemaBlackList(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (final MetadataException e) {
- logger.error(e.getMessage(), e);
+ logMetaDataException(e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@@ -537,7 +541,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
schemaRegion.deleteTimeseriesInBlackList(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (final MetadataException e) {
- logger.error(e.getMessage(), e);
+ logMetaDataException(e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@@ -550,7 +554,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
TSStatusCode.SUCCESS_STATUS,
String.valueOf(schemaRegion.constructSchemaBlackListWithTemplate(node)));
} catch (final MetadataException e) {
- logger.error(e.getMessage(), e);
+ logMetaDataException(e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@@ -562,7 +566,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
schemaRegion.rollbackSchemaBlackListWithTemplate(node);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (final MetadataException e) {
- logger.error(e.getMessage(), e);
+ logMetaDataException(e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@@ -574,7 +578,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
schemaRegion.deactivateTemplateInBlackList(node);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (MetadataException e) {
- logger.error(e.getMessage(), e);
+ logMetaDataException(e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@@ -629,7 +633,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
TSStatusCode.SUCCESS_STATUS,
String.valueOf(schemaRegion.constructLogicalViewBlackList(node.getPatternTree())));
} catch (MetadataException e) {
- logger.error(e.getMessage(), e);
+ logMetaDataException(e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@@ -641,7 +645,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
schemaRegion.rollbackLogicalViewBlackList(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (final MetadataException e) {
- logger.error(e.getMessage(), e);
+ logMetaDataException(e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@@ -652,7 +656,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
schemaRegion.deleteLogicalView(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (MetadataException e) {
- logger.error(e.getMessage(), e);
+ logMetaDataException(e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@@ -689,7 +693,29 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
}
@Override
- public TSStatus visitPlan(PlanNode node, ISchemaRegion context) {
+ public TSStatus visitPlan(final PlanNode node, final ISchemaRegion context) {
return null;
}
+
+ public static void logMetaDataException(
+ final @Nonnull String message, final @Nonnull MetadataException e) {
+ if (e.isUserException()) {
+ logger.info(message);
+ } else {
+ logger.error(message, e);
+ }
+ }
+
+ public static void logMetaDataException(final @Nonnull MetadataException e) {
+ if (e.isUserException()) {
+ logger.info(e.getMessage());
+ } else {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ @TestOnly
+ public static void setLogger(final Logger logger) {
+ SchemaExecutionVisitor.logger = logger;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitorTest.java
new file mode 100644
index 00000000000..b36f585e117
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitorTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus.statemachine.schemaregion;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
+
+public class SchemaExecutionVisitorTest {
+ @Test
+ public void testAuthLogger() {
+ SchemaExecutionVisitor.setLogger(
+ new Logger() {
+ @Override
+ public String getName() {
+ return null;
+ }
+
+ @Override
+ public boolean isTraceEnabled() {
+ return false;
+ }
+
+ @Override
+ public void trace(String msg) {}
+
+ @Override
+ public void trace(String format, Object arg) {}
+
+ @Override
+ public void trace(String format, Object arg1, Object arg2) {}
+
+ @Override
+ public void trace(String format, Object... arguments) {}
+
+ @Override
+ public void trace(String msg, Throwable t) {}
+
+ @Override
+ public boolean isTraceEnabled(Marker marker) {
+ return false;
+ }
+
+ @Override
+ public void trace(Marker marker, String msg) {}
+
+ @Override
+ public void trace(Marker marker, String format, Object arg) {}
+
+ @Override
+ public void trace(Marker marker, String format, Object arg1, Object
arg2) {}
+
+ @Override
+ public void trace(Marker marker, String format, Object... argArray)
{}
+
+ @Override
+ public void trace(Marker marker, String msg, Throwable t) {}
+
+ @Override
+ public boolean isDebugEnabled() {
+ return false;
+ }
+
+ @Override
+ public void debug(String msg) {}
+
+ @Override
+ public void debug(String format, Object arg) {}
+
+ @Override
+ public void debug(String format, Object arg1, Object arg2) {}
+
+ @Override
+ public void debug(String format, Object... arguments) {}
+
+ @Override
+ public void debug(String msg, Throwable t) {}
+
+ @Override
+ public boolean isDebugEnabled(Marker marker) {
+ return false;
+ }
+
+ @Override
+ public void debug(Marker marker, String msg) {}
+
+ @Override
+ public void debug(Marker marker, String format, Object arg) {}
+
+ @Override
+ public void debug(Marker marker, String format, Object arg1, Object
arg2) {}
+
+ @Override
+ public void debug(Marker marker, String format, Object... arguments)
{}
+
+ @Override
+ public void debug(Marker marker, String msg, Throwable t) {}
+
+ @Override
+ public boolean isInfoEnabled() {
+ return true;
+ }
+
+ @Override
+ public void info(String msg) {}
+
+ @Override
+ public void info(String format, Object arg) {}
+
+ @Override
+ public void info(String format, Object arg1, Object arg2) {}
+
+ @Override
+ public void info(String format, Object... arguments) {}
+
+ @Override
+ public void info(String msg, Throwable t) {}
+
+ @Override
+ public boolean isInfoEnabled(Marker marker) {
+ return true;
+ }
+
+ @Override
+ public void info(Marker marker, String msg) {}
+
+ @Override
+ public void info(Marker marker, String format, Object arg) {}
+
+ @Override
+ public void info(Marker marker, String format, Object arg1, Object
arg2) {}
+
+ @Override
+ public void info(Marker marker, String format, Object... arguments)
{}
+
+ @Override
+ public void info(Marker marker, String msg, Throwable t) {}
+
+ // Warn
+ @Override
+ public boolean isWarnEnabled() {
+ return true;
+ }
+
+ @Override
+ public void warn(String msg) {}
+
+ @Override
+ public void warn(String format, Object arg) {}
+
+ @Override
+ public void warn(String format, Object... arguments) {}
+
+ @Override
+ public void warn(String format, Object arg1, Object arg2) {}
+
+ @Override
+ public void warn(String msg, Throwable t) {}
+
+ @Override
+ public boolean isWarnEnabled(Marker marker) {
+ return false;
+ }
+
+ @Override
+ public void warn(Marker marker, String msg) {}
+
+ @Override
+ public void warn(Marker marker, String format, Object arg) {}
+
+ @Override
+ public void warn(Marker marker, String format, Object arg1, Object
arg2) {}
+
+ @Override
+ public void warn(Marker marker, String format, Object... arguments)
{}
+
+ @Override
+ public void warn(Marker marker, String msg, Throwable t) {}
+
+ @Override
+ public boolean isErrorEnabled() {
+ return true;
+ }
+
+ @Override
+ public void error(String msg) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void error(String format, Object arg) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void error(String format, Object arg1, Object arg2) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void error(String format, Object... arguments) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void error(String msg, Throwable t) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isErrorEnabled(Marker marker) {
+ return true;
+ }
+
+ @Override
+ public void error(Marker marker, String msg) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void error(Marker marker, String format, Object arg) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void error(Marker marker, String format, Object arg1, Object
arg2) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void error(Marker marker, String format, Object... arguments)
{
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void error(Marker marker, String msg, Throwable t) {
+ throw new UnsupportedOperationException();
+ }
+ });
+ SchemaExecutionVisitor.logMetaDataException(new MetadataException("",
true));
+ SchemaExecutionVisitor.logMetaDataException("", new MetadataException("",
true));
+ try {
+ SchemaExecutionVisitor.logMetaDataException(new MetadataException("",
false));
+ Assert.fail();
+ } catch (final UnsupportedOperationException e) {
+ // Expected
+ }
+ try {
+ SchemaExecutionVisitor.logMetaDataException(new MetadataException("",
false));
+ Assert.fail();
+ } catch (final UnsupportedOperationException e) {
+ // Expected
+ }
+
SchemaExecutionVisitor.setLogger(LoggerFactory.getLogger(SchemaExecutionVisitor.class));
+ }
+}