This is an automated email from the ASF dual-hosted git repository. yuyuankang pushed a commit to branch kyy2 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 693b1450ac6f0e7fd72268eac9fdef757fd3d60d Author: Ring-k <[email protected]> AuthorDate: Sun Jul 12 18:00:03 2020 +0800 remove backip --- .../cluster/server/member/MetaGroupMember.java | 25 +++++++----- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 34 +++++++++++++--- .../iotdb/db/qp/physical/crud/InsertPlan.java | 45 ++++++++++++++++++++-- .../iotdb/db/qp/physical/crud/InsertRowPlan.java | 16 ++++++++ .../db/qp/physical/crud/InsertTabletPlan.java | 19 +++++++++ 5 files changed, 119 insertions(+), 20 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 99600b9..a36ca6d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -1454,9 +1454,9 @@ public class MetaGroupMember extends RaftMember { setStorageGroupResult.getCode(), storageGroupName) ); } - if(plan instanceof InsertRowPlan){ + if(plan instanceof InsertPlan){ // try to create timeseries - boolean isAutoCreateTimeseriesSuccess = autoCreateTimeseries((InsertRowPlan) plan); + boolean isAutoCreateTimeseriesSuccess = autoCreateTimeseries((InsertPlan) plan); if (!isAutoCreateTimeseriesSuccess) { throw new MetadataException( "Failed to create timeseries from InsertPlan automatically." @@ -1473,10 +1473,10 @@ public class MetaGroupMember extends RaftMember { * @return */ TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) { - InsertRowPlan backup = null; - if (plan instanceof InsertRowPlan) { - backup = (InsertRowPlan) ((InsertRowPlan) plan).clone(); - } +// InsertRowPlan backup = null; +// if (plan instanceof InsertRowPlan) { +// backup = (InsertRowPlan) ((InsertRowPlan) plan).clone(); +// } // the error codes from the groups that cannot execute the plan TSStatus status; if (planGroupMap.size() == 1) { @@ -1492,9 +1492,12 @@ public class MetaGroupMember extends RaftMember { && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode() && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) { // try to create timeseries - boolean hasCreate = autoCreateTimeseries(backup); + if(((InsertPlan)plan).getFailedMeasurements() != null){ + ((InsertPlan)plan).transform(); + } + boolean hasCreate = autoCreateTimeseries((InsertPlan) plan); if (hasCreate) { - status = forwardPlan(planGroupMap, backup); + status = forwardPlan(planGroupMap, plan); } else { logger.error("{}, Cannot auto create timeseries.", thisNode); } @@ -1629,7 +1632,7 @@ public class MetaGroupMember extends RaftMember { * @param insertPlan, some of the timeseries in it are not created yet * @return true of all uncreated timeseries are created */ - boolean autoCreateTimeseries(InsertRowPlan insertPlan) { + boolean autoCreateTimeseries(InsertPlan insertPlan) { List<String> seriesList = new ArrayList<>(); String deviceId = insertPlan.getDeviceId(); String storageGroupName; @@ -1651,7 +1654,9 @@ public class MetaGroupMember extends RaftMember { for (String seriesPath : unregisteredSeriesList) { int index = seriesList.indexOf(seriesPath); TSDataType dataType = TypeInferenceUtils - .getPredictedDataType(insertPlan.getValues()[index], true); + .getPredictedDataType(insertPlan instanceof InsertTabletPlan + ? ((Object[]) ((InsertTabletPlan) insertPlan).getColumns()[index])[0] + : ((InsertRowPlan) insertPlan).getValues()[index], true); TSEncoding encoding = getDefaultEncoding(dataType); CompressionType compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor(); CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new Path(seriesPath), diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index c3be451..1395ece 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -71,6 +71,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.DeleteFailedException; import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; @@ -262,7 +263,8 @@ public class PlanExecutor implements IPlanExecutor { (storageGroupName, partitionId) -> storageGroupName.equals(((DeletePartitionPlan) plan).getStorageGroupName()) && p.getPartitionId().contains(partitionId); - StorageEngine.getInstance().removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter); + StorageEngine.getInstance() + .removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter); return true; case CREATE_SCHEMA_SNAPSHOT: operateCreateSnapshot(); @@ -849,8 +851,10 @@ public class PlanExecutor implements IPlanExecutor { } protected MeasurementSchema[] getSeriesSchemas(InsertPlan insertPlan) - throws MetadataException { - return mManager.getSeriesSchemasAndReadLockDevice(insertPlan.getDeviceId(), insertPlan.getMeasurements(), insertPlan); + throws MetadataException { + return mManager + .getSeriesSchemasAndReadLockDevice(insertPlan.getDeviceId(), insertPlan.getMeasurements(), + insertPlan); } @Override @@ -860,15 +864,33 @@ public class PlanExecutor implements IPlanExecutor { insertRowPlan.setSchemasAndTransferType(schemas); StorageEngine.getInstance().insert(insertRowPlan); if (insertRowPlan.getFailedMeasurements() != null) { - throw new StorageEngineException( - "failed to insert measurements " + insertRowPlan.getFailedMeasurements()); + // check if all path not exist exceptions + List<String> failedPaths = insertRowPlan.getFailedMeasurements(); + List<Exception> exceptions = insertRowPlan.getFailedExceptions(); + boolean isPathNotExistException = true; + for (Exception e : exceptions) { + Throwable curException = e; + while (curException.getCause() != null) { + curException = curException.getCause(); + } + if (!(curException instanceof PathNotExistException)) { + isPathNotExistException = false; + break; + } + } + if (isPathNotExistException) { + throw new PathNotExistException(failedPaths); + } else { + throw new StorageEngineException( + "failed to insert points " + insertRowPlan.getFailedMeasurements()); + } } } catch (StorageEngineException | MetadataException e) { throw new QueryProcessException(e); } finally { // TODO: put lock and unlock in the same block mManager.unlockDeviceReadLock(insertRowPlan.getDeviceId()); - } + } } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java index 3b4222c..ba89656 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java @@ -19,8 +19,12 @@ package org.apache.iotdb.db.qp.physical.crud; +import io.jsonwebtoken.lang.Collections; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.iotdb.db.cost.statistic.Measurement; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -35,7 +39,9 @@ public abstract class InsertPlan extends PhysicalPlan { // record the failed measurements - Map<String, Exception> failedMeasurements; + List<String> failedMeasurements; + List<Exception> failedExceptions; + List<Integer> failedIndices; public InsertPlan(Operator.OperatorType operatorType) { super(false, operatorType); @@ -74,10 +80,14 @@ public abstract class InsertPlan extends PhysicalPlan { this.schemas = schemas; } - public Map<String, Exception> getFailedMeasurements() { + public List<String> getFailedMeasurements() { return failedMeasurements; } + public List<Exception> getFailedExceptions() { + return failedExceptions; + } + public int getFailedMeasurementNumber() { return failedMeasurements == null ? 0 : failedMeasurements.size(); } @@ -88,11 +98,38 @@ public abstract class InsertPlan extends PhysicalPlan { */ public void markFailedMeasurementInsertion(int index, Exception e) { if (failedMeasurements == null) { - failedMeasurements = new HashMap<>(); + failedMeasurements = new ArrayList<>(); + failedExceptions = new ArrayList<>(); + failedIndices = new ArrayList<>(); } - failedMeasurements.put(measurements[index], e); + failedMeasurements.add(measurements[index]); + failedExceptions.add(e); + failedIndices.add(index); measurements[index] = null; dataTypes[index] = null; } + public InsertPlan transform() { + if (failedMeasurements == null) { + return null; + } + measurements = failedMeasurements.toArray(new String[0]); + failedMeasurements = null; + if(dataTypes != null){ + TSDataType[] temp = dataTypes.clone(); + dataTypes = new TSDataType[failedIndices.size()]; + for(int i = 0; i < failedIndices.size(); i++){ + dataTypes[i] = temp[failedIndices.get(i)]; + } + } + if(schemas != null){ + MeasurementSchema[] temp = schemas.clone(); + schemas = new MeasurementSchema[failedIndices.size()]; + for(int i = 0; i < failedIndices.size(); i++){ + schemas[i] = temp[failedIndices.get(i)]; + } + } + return this; + } + } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java index 99b179e..82acd24 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java @@ -57,6 +57,8 @@ public class InsertRowPlan extends InsertPlan { // if values is object[], we could use the raw type of them, and we should set this to false private boolean isNeedInferType = false; + private List<Object> failedValues; + public InsertRowPlan() { super(OperatorType.INSERT); } @@ -190,6 +192,10 @@ public class InsertRowPlan extends InsertPlan { @Override public void markFailedMeasurementInsertion(int index, Exception e) { super.markFailedMeasurementInsertion(index, e); + if (failedValues == null) { + failedValues = new ArrayList<>(); + } + failedValues.add(values[index]); values[index] = null; } @@ -452,4 +458,14 @@ public class InsertRowPlan extends InsertPlan { System.arraycopy(this.dataTypes, 0, typesClone, 0, typesClone.length); return new InsertRowPlan(deviceIdClone, timeClone, measurementsClone, typesClone, valuesClone); } + + @Override + public InsertPlan transform() { + if (super.transform() == null) { + return null; + } + values = failedValues.toArray(new Object[0]); + failedValues = null; + return this; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java index 3780365..da4347b 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java @@ -62,6 +62,8 @@ public class InsertTabletPlan extends InsertPlan { private int end; private List<Integer> range; + private List<Object> failedColumns; + public InsertTabletPlan() { super(OperatorType.BATCHINSERT); @@ -501,9 +503,25 @@ public class InsertTabletPlan extends InsertPlan { public void markFailedMeasurementInsertion(int index, Exception e) { super.markFailedMeasurementInsertion(index, e); + if (failedColumns == null) { + failedColumns = new ArrayList<>(); + } + failedColumns.add(columns[index]); columns[index] = null; } + + @Override + public InsertPlan transform() { + if (super.transform() == null) { + return null; + } + // TODO anything else? + columns = failedColumns.toArray(new Object[0]); + failedColumns = null; + return this; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -532,4 +550,5 @@ public class InsertTabletPlan extends InsertPlan { result = 31 * result + Arrays.hashCode(times); return result; } + }
