This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch concurrent_schema_fetch in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a37500f2f9a624ae4b220b76dad63b8b4ecdda88 Author: Marccos <[email protected]> AuthorDate: Wed Dec 14 10:13:33 2022 +0800 improve code structure of fetch schema for data inert and auto create schema --- .../analyze/schema/AutoCreateSchemaExecutor.java | 99 +++++++++++++++- .../plan/analyze/schema/ClusterSchemaFetcher.java | 127 ++++++--------------- 2 files changed, 134 insertions(+), 92 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java index b5389698b5..b521a355e7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java @@ -25,36 +25,129 @@ import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.metadata.template.Template; import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree; import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; import org.apache.iotdb.db.mpp.plan.statement.Statement; import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; + class AutoCreateSchemaExecutor { private final Function<Statement, ExecutionResult> statementExecutor; + private final Function<PartialPath, Pair<Template, PartialPath>> templateSetInfoProvider; - AutoCreateSchemaExecutor(Function<Statement, ExecutionResult> statementExecutor) { + AutoCreateSchemaExecutor( + Function<Statement, ExecutionResult> statementExecutor, + Function<PartialPath, Pair<Template, PartialPath>> templateSetInfoProvider) { this.statementExecutor = statementExecutor; + this.templateSetInfoProvider = templateSetInfoProvider; + } + + void autoCreateSchema( + ClusterSchemaTree schemaTree, + PartialPath devicePath, + List<Integer> indexOfMissingMeasurements, + String[] measurements, + Function<Integer, TSDataType> getDataType, + TSEncoding[] encodings, + CompressionType[] compressionTypes, + boolean isAligned) { + // check whether there is template should be activated + Pair<Template, PartialPath> templateInfo = templateSetInfoProvider.apply(devicePath); + if (templateInfo != null) { + Template template = templateInfo.left; + boolean shouldActivateTemplate = false; + for (int index : indexOfMissingMeasurements) { + if (template.hasSchema(measurements[index])) { + shouldActivateTemplate = true; + break; + } + } + + if (shouldActivateTemplate) { + internalActivateTemplate(devicePath); + List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>(); + for (int i = 0; i < indexOfMissingMeasurements.size(); i++) { + if (!template.hasSchema(measurements[i])) { + recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i)); + } + } + indexOfMissingMeasurements = recheckedIndexOfMissingMeasurements; + for (Map.Entry<String, IMeasurementSchema> entry : template.getSchemaMap().entrySet()) { + schemaTree.appendSingleMeasurement( + devicePath.concatNode(entry.getKey()), + (MeasurementSchema) entry.getValue(), + null, + null, + template.isDirectAligned()); + } + + if (indexOfMissingMeasurements.isEmpty()) { + return; + } + } + } + + // auto create the rest missing timeseries + List<String> missingMeasurements = new ArrayList<>(indexOfMissingMeasurements.size()); + List<TSDataType> dataTypesOfMissingMeasurement = + new ArrayList<>(indexOfMissingMeasurements.size()); + List<TSEncoding> encodingsOfMissingMeasurement = + new ArrayList<>(indexOfMissingMeasurements.size()); + List<CompressionType> compressionTypesOfMissingMeasurement = + new ArrayList<>(indexOfMissingMeasurements.size()); + indexOfMissingMeasurements.forEach( + index -> { + TSDataType tsDataType = getDataType.apply(index); + // tsDataType == null means insert null value to a non-exist series + // should skip creating them + if (tsDataType != null) { + missingMeasurements.add(measurements[index]); + dataTypesOfMissingMeasurement.add(tsDataType); + encodingsOfMissingMeasurement.add( + encodings == null ? getDefaultEncoding(tsDataType) : encodings[index]); + compressionTypesOfMissingMeasurement.add( + compressionTypes == null + ? TSFileDescriptor.getInstance().getConfig().getCompressor() + : compressionTypes[index]); + } + }); + + if (!missingMeasurements.isEmpty()) { + schemaTree.mergeSchemaTree( + internalCreateTimeseries( + devicePath, + missingMeasurements, + dataTypesOfMissingMeasurement, + encodingsOfMissingMeasurement, + compressionTypesOfMissingMeasurement, + isAligned)); + } } // try to create the target timeseries and return schemaTree involving successfully created // timeseries and existing timeseries - ClusterSchemaTree internalCreateTimeseries( + private ClusterSchemaTree internalCreateTimeseries( PartialPath devicePath, List<String> measurements, List<TSDataType> tsDataTypes, @@ -125,7 +218,7 @@ class AutoCreateSchemaExecutor { return alreadyExistingMeasurements; } - void internalActivateTemplate(PartialPath devicePath) { + private void internalActivateTemplate(PartialPath devicePath) { ExecutionResult executionResult = statementExecutor.apply(new ActivateTemplateStatement(devicePath)); TSStatus status = executionResult.status; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java index 3f88d88825..4d5a7e498d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java @@ -34,12 +34,10 @@ import org.apache.iotdb.db.mpp.plan.Coordinator; import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement; import org.apache.iotdb.db.query.control.SessionManager; -import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import java.util.ArrayList; @@ -53,8 +51,6 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; - public class ClusterSchemaFetcher implements ISchemaFetcher { private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -73,7 +69,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { "", ClusterPartitionFetcher.getInstance(), this, - config.getQueryTimeoutThreshold())); + config.getQueryTimeoutThreshold()), + templateManager::checkTemplateSetInfo); private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor = new ClusterSchemaFetchExecutor( coordinator, @@ -102,15 +99,17 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { @Override public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) { - return fetchSchema(patternTree, false); + return checkPatternTreeAndFetchSchema(patternTree, false); } @Override public ClusterSchemaTree fetchSchemaWithTags(PathPatternTree patternTree) { - return fetchSchema(patternTree, true); + return checkPatternTreeAndFetchSchema(patternTree, true); } - private ClusterSchemaTree fetchSchema(PathPatternTree patternTree, boolean withTags) { + // used for patternTree that may have wildcard, mainly for data query + private ClusterSchemaTree checkPatternTreeAndFetchSchema( + PathPatternTree patternTree, boolean withTags) { Map<Integer, Template> templateMap = new HashMap<>(); patternTree.constructTree(); List<PartialPath> pathPatternList = patternTree.getAllPathPatterns(); @@ -208,7 +207,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { for (int index : indexOfMissingMeasurements) { patternTree.appendFullPath(devicePath, measurements[index]); } - ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree); + ClusterSchemaTree remoteSchemaTree = fetchSchemaFromRemote(patternTree); if (!remoteSchemaTree.isEmpty()) { schemaTree.mergeSchemaTree(remoteSchemaTree); schemaCache.put(remoteSchemaTree); @@ -276,7 +275,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { } // try fetch the missing schema from remote and cache fetched schema - ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree); + ClusterSchemaTree remoteSchemaTree = fetchSchemaFromRemote(patternTree); if (!remoteSchemaTree.isEmpty()) { schemaTree.mergeSchemaTree(remoteSchemaTree); schemaCache.put(remoteSchemaTree); @@ -320,6 +319,18 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { return templateManager.getAllPathsSetTemplate(templateName); } + // used for patternTree without wildcard, mainly for data insert and load tsFile + private ClusterSchemaTree fetchSchemaFromRemote(PathPatternTree patternTree) { + Map<Integer, Template> templateMap = new HashMap<>(); + patternTree.constructTree(); + List<PartialPath> pathPatternList = patternTree.getAllPathPatterns(); + for (PartialPath pattern : pathPatternList) { + templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern)); + } + return clusterSchemaFetchExecutor.executeSchemaFetchQuery( + new SchemaFetchStatement(patternTree, templateMap, false)); + } + // check which measurements are missing and auto create the missing measurements and merge them // into given schemaTree private void checkAndAutoCreateMissingMeasurements( @@ -338,90 +349,28 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { indexOfMissingMeasurements.stream() .map(index -> measurements[index]) .collect(Collectors.toList())); - if (deviceSchemaInfo != null) { + List<Integer> recheckedIndexOfMissingMeasurements; + if (deviceSchemaInfo == null) { + recheckedIndexOfMissingMeasurements = indexOfMissingMeasurements; + } else { + recheckedIndexOfMissingMeasurements = new ArrayList<>(indexOfMissingMeasurements.size()); List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList(); - int removedCount = 0; for (int i = 0, size = schemaList.size(); i < size; i++) { - if (schemaList.get(i) != null) { - indexOfMissingMeasurements.remove(i - removedCount); - removedCount++; + if (schemaList.get(i) == null) { + recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i)); } } } - if (indexOfMissingMeasurements.isEmpty()) { - return; - } - - // check whether there is template should be activated - Pair<Template, PartialPath> templateInfo = templateManager.checkTemplateSetInfo(devicePath); - if (templateInfo != null) { - Template template = templateInfo.left; - boolean shouldActivateTemplate = false; - for (int index : indexOfMissingMeasurements) { - if (template.hasSchema(measurements[index])) { - shouldActivateTemplate = true; - break; - } - } - - if (shouldActivateTemplate) { - autoCreateSchemaExecutor.internalActivateTemplate(devicePath); - List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>(); - for (int i = 0; i < indexOfMissingMeasurements.size(); i++) { - if (!template.hasSchema(measurements[i])) { - recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i)); - } - } - indexOfMissingMeasurements = recheckedIndexOfMissingMeasurements; - for (Map.Entry<String, IMeasurementSchema> entry : template.getSchemaMap().entrySet()) { - schemaTree.appendSingleMeasurement( - devicePath.concatNode(entry.getKey()), - (MeasurementSchema) entry.getValue(), - null, - null, - template.isDirectAligned()); - } - - if (indexOfMissingMeasurements.isEmpty()) { - return; - } - } - } - - // auto create the rest missing timeseries - List<String> missingMeasurements = new ArrayList<>(indexOfMissingMeasurements.size()); - List<TSDataType> dataTypesOfMissingMeasurement = - new ArrayList<>(indexOfMissingMeasurements.size()); - List<TSEncoding> encodingsOfMissingMeasurement = - new ArrayList<>(indexOfMissingMeasurements.size()); - List<CompressionType> compressionTypesOfMissingMeasurement = - new ArrayList<>(indexOfMissingMeasurements.size()); - indexOfMissingMeasurements.forEach( - index -> { - TSDataType tsDataType = getDataType.apply(index); - // tsDataType == null means insert null value to a non-exist series - // should skip creating them - if (tsDataType != null) { - missingMeasurements.add(measurements[index]); - dataTypesOfMissingMeasurement.add(tsDataType); - encodingsOfMissingMeasurement.add( - encodings == null ? getDefaultEncoding(tsDataType) : encodings[index]); - compressionTypesOfMissingMeasurement.add( - compressionTypes == null - ? TSFileDescriptor.getInstance().getConfig().getCompressor() - : compressionTypes[index]); - } - }); - - if (!missingMeasurements.isEmpty()) { - schemaTree.mergeSchemaTree( - autoCreateSchemaExecutor.internalCreateTimeseries( - devicePath, - missingMeasurements, - dataTypesOfMissingMeasurement, - encodingsOfMissingMeasurement, - compressionTypesOfMissingMeasurement, - isAligned)); + if (!recheckedIndexOfMissingMeasurements.isEmpty()) { + autoCreateSchemaExecutor.autoCreateSchema( + schemaTree, + devicePath, + recheckedIndexOfMissingMeasurements, + measurements, + getDataType, + encodings, + compressionTypes, + isAligned); } }
