This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch rc/1.1.0 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4a5b900419eec61171a315c9f6e586487948fa55 Author: Marcos_Zyk <[email protected]> AuthorDate: Thu Mar 16 16:15:10 2023 +0800 [To rel/1.1] Fix dispatch result collection logic (#9323) --- .../execution/executor/RegionWriteExecutor.java | 149 ++++++++------------- .../metedata/write/CreateMultiTimeSeriesNode.java | 6 +- .../plan/node/metedata/write/MeasurementGroup.java | 36 ++--- .../db/mpp/plan/scheduler/AsyncPlanNodeSender.java | 34 +++++ .../scheduler/FragmentInstanceDispatcherImpl.java | 32 ++++- 5 files changed, 132 insertions(+), 125 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java index 71e870cc67..69ca7a4f3b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java @@ -462,53 +462,10 @@ public class RegionWriteExecutor { } measurementGroup.removeMeasurements(failingMeasurementMap.keySet()); - RegionExecutionResult executionResult = - super.visitInternalCreateTimeSeries(node, context); - - if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) { - return executionResult; - } - - TSStatus executionStatus = executionResult.getStatus(); - - // separate the measurement_already_exist exception and other exceptions process, - // measurement_already_exist exception is acceptable due to concurrent timeseries creation - if (failingStatus.isEmpty()) { - if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - if (executionStatus.getSubStatus().get(0).getCode() - == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - // there's only measurement_already_exist exception - alreadyExistingStatus.addAll(executionStatus.getSubStatus()); - } else { - failingStatus.addAll(executionStatus.getSubStatus()); - } - } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - failingStatus.add(executionStatus); - } - } else { - if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - if (executionStatus.getSubStatus().get(0).getCode() - != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - failingStatus.addAll(executionStatus.getSubStatus()); - } - } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - failingStatus.add(executionStatus); - } - } - - RegionExecutionResult result = new RegionExecutionResult(); - TSStatus status; - if (failingStatus.isEmpty()) { - status = RpcUtils.getStatus(alreadyExistingStatus); - result.setAccepted(true); - } else { - status = RpcUtils.getStatus(failingStatus); - result.setAccepted(false); - } - - result.setMessage(status.getMessage()); - result.setStatus(status); - return result; + return processExecutionResultOfInternalCreateSchema( + super.visitInternalCreateTimeSeries(node, context), + failingStatus, + alreadyExistingStatus); } finally { context.getRegionWriteValidationRWLock().writeLock().unlock(); } @@ -562,59 +519,65 @@ public class RegionWriteExecutor { measurementGroup.removeMeasurements(failingMeasurementMap.keySet()); } - RegionExecutionResult executionResult = - super.visitInternalCreateMultiTimeSeries(node, context); - - if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) { - return executionResult; - } - - TSStatus executionStatus = executionResult.getStatus(); + return processExecutionResultOfInternalCreateSchema( + super.visitInternalCreateMultiTimeSeries(node, context), + failingStatus, + alreadyExistingStatus); + } finally { + context.getRegionWriteValidationRWLock().writeLock().unlock(); + } + } else { + return super.visitInternalCreateMultiTimeSeries(node, context); + } + } - // separate the measurement_already_exist exception and other exceptions process, - // measurement_already_exist exception is acceptable due to concurrent timeseries creation - if (failingStatus.isEmpty()) { - if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - if (executionStatus.getSubStatus().get(0).getCode() - == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - // there's only measurement_already_exist exception - alreadyExistingStatus.addAll(executionStatus.getSubStatus()); - } else { - failingStatus.addAll(executionStatus.getSubStatus()); - } - } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - failingStatus.add(executionStatus); - } + private RegionExecutionResult processExecutionResultOfInternalCreateSchema( + RegionExecutionResult executionResult, + List<TSStatus> failingStatus, + List<TSStatus> alreadyExistingStatus) { + TSStatus executionStatus = executionResult.getStatus(); + + // separate the measurement_already_exist exception and other exceptions process, + // measurement_already_exist exception is acceptable due to concurrent timeseries creation + if (failingStatus.isEmpty()) { + if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + if (executionStatus.getSubStatus().get(0).getCode() + == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { + // there's only measurement_already_exist exception + alreadyExistingStatus.addAll(executionStatus.getSubStatus()); } else { - if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - if (executionStatus.getSubStatus().get(0).getCode() - != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - failingStatus.addAll(executionStatus.getSubStatus()); - } - } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - failingStatus.add(executionStatus); - } + failingStatus.addAll(executionStatus.getSubStatus()); } - - RegionExecutionResult result = new RegionExecutionResult(); - TSStatus status; - if (failingStatus.isEmpty()) { - status = RpcUtils.getStatus(alreadyExistingStatus); - result.setAccepted(true); - } else { - status = RpcUtils.getStatus(failingStatus); - result.setAccepted(false); + } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failingStatus.add(executionStatus); + } + } else { + if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + if (executionStatus.getSubStatus().get(0).getCode() + != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { + failingStatus.addAll(executionStatus.getSubStatus()); } - - result.setMessage(status.getMessage()); - result.setStatus(status); - return result; - } finally { - context.getRegionWriteValidationRWLock().writeLock().unlock(); + } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failingStatus.add(executionStatus); } + } + + RegionExecutionResult result = new RegionExecutionResult(); + TSStatus status; + if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) { + status = RpcUtils.SUCCESS_STATUS; + result.setAccepted(true); + } else if (failingStatus.isEmpty()) { + status = RpcUtils.getStatus(alreadyExistingStatus); + result.setAccepted(true); } else { - return super.visitInternalCreateMultiTimeSeries(node, context); + status = RpcUtils.getStatus(failingStatus); + result.setAccepted(false); } + + result.setMessage(status.getMessage()); + result.setStatus(status); + return result; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java index a739020e30..32445aee12 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java @@ -83,8 +83,10 @@ public class CreateMultiTimeSeriesNode extends WritePlanNode { measurementGroupMap.put(devicePath, measurementGroup); } - measurementGroup.addMeasurement( - paths.get(i).getMeasurement(), dataTypes.get(i), encodings.get(i), compressors.get(i)); + if (!measurementGroup.addMeasurement( + paths.get(i).getMeasurement(), dataTypes.get(i), encodings.get(i), compressors.get(i))) { + continue; + } if (propsList != null) { measurementGroup.addProps(propsList.get(i)); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java index f5bd2ccc52..5123f0ba0b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java @@ -28,6 +28,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -44,6 +45,8 @@ public class MeasurementGroup { private List<Map<String, String>> tagsList; private List<Map<String, String>> attributesList; + private final transient Set<String> measurementSet = new HashSet<>(); + public List<String> getMeasurements() { return measurements; } @@ -80,15 +83,20 @@ public class MeasurementGroup { return attributesList; } - public void addMeasurement( + public boolean addMeasurement( String measurement, TSDataType dataType, TSEncoding encoding, CompressionType compressionType) { + if (measurementSet.contains(measurement)) { + return false; + } measurements.add(measurement); + measurementSet.add(measurement); dataTypes.add(dataType); encodings.add(encoding); compressors.add(compressionType); + return true; } public void addAlias(String alias) { @@ -119,29 +127,6 @@ public class MeasurementGroup { attributesList.add(attributes); } - public void removeMeasurement(int index) { - measurements.remove(index); - dataTypes.remove(index); - encodings.remove(index); - compressors.remove(index); - - if (aliasList != null) { - aliasList.remove(index); - } - - if (propsList != null) { - propsList.remove(index); - } - - if (tagsList != null) { - tagsList.remove(index); - } - - if (attributesList != null) { - attributesList.remove(index); - } - } - public void removeMeasurements(Set<Integer> indexSet) { int restSize = this.measurements.size() - indexSet.size(); List<String> measurements = new ArrayList<>(restSize); @@ -156,6 +141,7 @@ public class MeasurementGroup { for (int i = 0; i < this.measurements.size(); i++) { if (indexSet.contains(i)) { + measurementSet.remove(this.measurements.get(i)); continue; } measurements.add(this.measurements.get(i)); @@ -217,6 +203,7 @@ public class MeasurementGroup { MeasurementGroup subMeasurementGroup; subMeasurementGroup = new MeasurementGroup(); subMeasurementGroup.measurements = measurements.subList(startIndex, endIndex); + subMeasurementGroup.measurementSet.addAll(subMeasurementGroup.measurements); subMeasurementGroup.dataTypes = dataTypes.subList(startIndex, endIndex); subMeasurementGroup.encodings = encodings.subList(startIndex, endIndex); subMeasurementGroup.compressors = compressors.subList(startIndex, endIndex); @@ -359,6 +346,7 @@ public class MeasurementGroup { for (int i = 0; i < size; i++) { measurements.add(ReadWriteIOUtils.readString(byteBuffer)); } + measurementSet.addAll(measurements); dataTypes = new ArrayList<>(); for (int i = 0; i < size; i++) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java index f19de2323f..29a5f80f62 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; @@ -32,6 +33,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -87,6 +89,38 @@ public class AsyncPlanNodeSender { } } + public List<TSStatus> getFailureStatusList() { + List<TSStatus> failureStatusList = new ArrayList<>(); + TSStatus status; + for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) { + status = entry.getValue().getStatus(); + if (!entry.getValue().accepted) { + if (status == null) { + logger.warn( + "dispatch write failed. message: {}, node {}", + entry.getValue().message, + instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint()); + failureStatusList.add( + RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, entry.getValue().getMessage())); + } else { + logger.warn( + "dispatch write failed. status: {}, code: {}, message: {}, node {}", + entry.getValue().status, + TSStatusCode.representOf(status.code), + entry.getValue().message, + instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint()); + failureStatusList.add(status); + } + } else { + // some expected and accepted status except SUCCESS_STATUS need to be returned + if (status != null && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failureStatusList.add(status); + } + } + } + return failureStatusList; + } + public Future<FragInstanceDispatchResult> getResult() { for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) { if (!entry.getValue().accepted) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index 3226884b56..0719b86452 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -177,19 +177,21 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { AsyncPlanNodeSender asyncPlanNodeSender = new AsyncPlanNodeSender(asyncInternalServiceClientManager, remoteInstances); asyncPlanNodeSender.sendAll(); + + List<TSStatus> dataNodeFailureList = new ArrayList<>(); + // sync dispatch to local long localScheduleStartTime = System.nanoTime(); for (FragmentInstance localInstance : localInstances) { try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) { dispatchOneInstance(localInstance); } catch (FragmentInstanceDispatchException e) { - return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus())); + dataNodeFailureList.add(e.getFailureStatus()); } catch (Throwable t) { logger.warn("[DispatchFailed]", t); - return immediateFuture( - new FragInstanceDispatchResult( - RpcUtils.getStatus( - TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()))); + dataNodeFailureList.add( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())); } } PerformanceOverviewMetricsManager.recordScheduleLocalCost( @@ -205,7 +207,25 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { RpcUtils.getStatus( TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " + e.getMessage()))); } - return asyncPlanNodeSender.getResult(); + + dataNodeFailureList.addAll(asyncPlanNodeSender.getFailureStatusList()); + + if (dataNodeFailureList.isEmpty()) { + return immediateFuture(new FragInstanceDispatchResult(true)); + } + if (instances.size() == 1) { + return immediateFuture(new FragInstanceDispatchResult(dataNodeFailureList.get(0))); + } else { + List<TSStatus> failureStatusList = new ArrayList<>(); + for (TSStatus dataNodeFailure : dataNodeFailureList) { + if (dataNodeFailure.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + failureStatusList.addAll(dataNodeFailure.getSubStatus()); + } else { + failureStatusList.add(dataNodeFailure); + } + } + return immediateFuture(new FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList))); + } } private void dispatchOneInstance(FragmentInstance instance)
