This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a37500f2f9a624ae4b220b76dad63b8b4ecdda88
Author: Marccos <[email protected]>
AuthorDate: Wed Dec 14 10:13:33 2022 +0800

    improve code structure of fetch schema for data inert and auto create schema
---
 .../analyze/schema/AutoCreateSchemaExecutor.java   |  99 +++++++++++++++-
 .../plan/analyze/schema/ClusterSchemaFetcher.java  | 127 ++++++---------------
 2 files changed, 134 insertions(+), 92 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
index b5389698b5..b521a355e7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -25,36 +25,129 @@ import 
org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import 
org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
 class AutoCreateSchemaExecutor {
 
   private final Function<Statement, ExecutionResult> statementExecutor;
+  private final Function<PartialPath, Pair<Template, PartialPath>> 
templateSetInfoProvider;
 
-  AutoCreateSchemaExecutor(Function<Statement, ExecutionResult> 
statementExecutor) {
+  AutoCreateSchemaExecutor(
+      Function<Statement, ExecutionResult> statementExecutor,
+      Function<PartialPath, Pair<Template, PartialPath>> 
templateSetInfoProvider) {
     this.statementExecutor = statementExecutor;
+    this.templateSetInfoProvider = templateSetInfoProvider;
+  }
+
+  void autoCreateSchema(
+      ClusterSchemaTree schemaTree,
+      PartialPath devicePath,
+      List<Integer> indexOfMissingMeasurements,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      TSEncoding[] encodings,
+      CompressionType[] compressionTypes,
+      boolean isAligned) {
+    // check whether there is template should be activated
+    Pair<Template, PartialPath> templateInfo = 
templateSetInfoProvider.apply(devicePath);
+    if (templateInfo != null) {
+      Template template = templateInfo.left;
+      boolean shouldActivateTemplate = false;
+      for (int index : indexOfMissingMeasurements) {
+        if (template.hasSchema(measurements[index])) {
+          shouldActivateTemplate = true;
+          break;
+        }
+      }
+
+      if (shouldActivateTemplate) {
+        internalActivateTemplate(devicePath);
+        List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
+        for (int i = 0; i < indexOfMissingMeasurements.size(); i++) {
+          if (!template.hasSchema(measurements[i])) {
+            
recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i));
+          }
+        }
+        indexOfMissingMeasurements = recheckedIndexOfMissingMeasurements;
+        for (Map.Entry<String, IMeasurementSchema> entry : 
template.getSchemaMap().entrySet()) {
+          schemaTree.appendSingleMeasurement(
+              devicePath.concatNode(entry.getKey()),
+              (MeasurementSchema) entry.getValue(),
+              null,
+              null,
+              template.isDirectAligned());
+        }
+
+        if (indexOfMissingMeasurements.isEmpty()) {
+          return;
+        }
+      }
+    }
+
+    // auto create the rest missing timeseries
+    List<String> missingMeasurements = new 
ArrayList<>(indexOfMissingMeasurements.size());
+    List<TSDataType> dataTypesOfMissingMeasurement =
+        new ArrayList<>(indexOfMissingMeasurements.size());
+    List<TSEncoding> encodingsOfMissingMeasurement =
+        new ArrayList<>(indexOfMissingMeasurements.size());
+    List<CompressionType> compressionTypesOfMissingMeasurement =
+        new ArrayList<>(indexOfMissingMeasurements.size());
+    indexOfMissingMeasurements.forEach(
+        index -> {
+          TSDataType tsDataType = getDataType.apply(index);
+          // tsDataType == null means insert null value to a non-exist series
+          // should skip creating them
+          if (tsDataType != null) {
+            missingMeasurements.add(measurements[index]);
+            dataTypesOfMissingMeasurement.add(tsDataType);
+            encodingsOfMissingMeasurement.add(
+                encodings == null ? getDefaultEncoding(tsDataType) : 
encodings[index]);
+            compressionTypesOfMissingMeasurement.add(
+                compressionTypes == null
+                    ? 
TSFileDescriptor.getInstance().getConfig().getCompressor()
+                    : compressionTypes[index]);
+          }
+        });
+
+    if (!missingMeasurements.isEmpty()) {
+      schemaTree.mergeSchemaTree(
+          internalCreateTimeseries(
+              devicePath,
+              missingMeasurements,
+              dataTypesOfMissingMeasurement,
+              encodingsOfMissingMeasurement,
+              compressionTypesOfMissingMeasurement,
+              isAligned));
+    }
   }
 
   // try to create the target timeseries and return schemaTree involving 
successfully created
   // timeseries and existing timeseries
-  ClusterSchemaTree internalCreateTimeseries(
+  private ClusterSchemaTree internalCreateTimeseries(
       PartialPath devicePath,
       List<String> measurements,
       List<TSDataType> tsDataTypes,
@@ -125,7 +218,7 @@ class AutoCreateSchemaExecutor {
     return alreadyExistingMeasurements;
   }
 
-  void internalActivateTemplate(PartialPath devicePath) {
+  private void internalActivateTemplate(PartialPath devicePath) {
     ExecutionResult executionResult =
         statementExecutor.apply(new ActivateTemplateStatement(devicePath));
     TSStatus status = executionResult.status;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 3f88d88825..4d5a7e498d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -34,12 +34,10 @@ import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.ArrayList;
@@ -53,8 +51,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static 
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-
 public class ClusterSchemaFetcher implements ISchemaFetcher {
 
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -73,7 +69,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
                   "",
                   ClusterPartitionFetcher.getInstance(),
                   this,
-                  config.getQueryTimeoutThreshold()));
+                  config.getQueryTimeoutThreshold()),
+          templateManager::checkTemplateSetInfo);
   private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor =
       new ClusterSchemaFetchExecutor(
           coordinator,
@@ -102,15 +99,17 @@ public class ClusterSchemaFetcher implements 
ISchemaFetcher {
 
   @Override
   public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
-    return fetchSchema(patternTree, false);
+    return checkPatternTreeAndFetchSchema(patternTree, false);
   }
 
   @Override
   public ClusterSchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
-    return fetchSchema(patternTree, true);
+    return checkPatternTreeAndFetchSchema(patternTree, true);
   }
 
-  private ClusterSchemaTree fetchSchema(PathPatternTree patternTree, boolean 
withTags) {
+  // used for patternTree that may have wildcard, mainly for data query
+  private ClusterSchemaTree checkPatternTreeAndFetchSchema(
+      PathPatternTree patternTree, boolean withTags) {
     Map<Integer, Template> templateMap = new HashMap<>();
     patternTree.constructTree();
     List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
@@ -208,7 +207,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher 
{
       for (int index : indexOfMissingMeasurements) {
         patternTree.appendFullPath(devicePath, measurements[index]);
       }
-      ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
+      ClusterSchemaTree remoteSchemaTree = fetchSchemaFromRemote(patternTree);
       if (!remoteSchemaTree.isEmpty()) {
         schemaTree.mergeSchemaTree(remoteSchemaTree);
         schemaCache.put(remoteSchemaTree);
@@ -276,7 +275,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher 
{
       }
 
       // try fetch the missing schema from remote and cache fetched schema
-      ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
+      ClusterSchemaTree remoteSchemaTree = fetchSchemaFromRemote(patternTree);
       if (!remoteSchemaTree.isEmpty()) {
         schemaTree.mergeSchemaTree(remoteSchemaTree);
         schemaCache.put(remoteSchemaTree);
@@ -320,6 +319,18 @@ public class ClusterSchemaFetcher implements 
ISchemaFetcher {
     return templateManager.getAllPathsSetTemplate(templateName);
   }
 
+  // used for patternTree without wildcard, mainly for data insert and load 
tsFile
+  private ClusterSchemaTree fetchSchemaFromRemote(PathPatternTree patternTree) 
{
+    Map<Integer, Template> templateMap = new HashMap<>();
+    patternTree.constructTree();
+    List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
+    for (PartialPath pattern : pathPatternList) {
+      templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
+    }
+    return clusterSchemaFetchExecutor.executeSchemaFetchQuery(
+        new SchemaFetchStatement(patternTree, templateMap, false));
+  }
+
   // check which measurements are missing and auto create the missing 
measurements and merge them
   // into given schemaTree
   private void checkAndAutoCreateMissingMeasurements(
@@ -338,90 +349,28 @@ public class ClusterSchemaFetcher implements 
ISchemaFetcher {
             indexOfMissingMeasurements.stream()
                 .map(index -> measurements[index])
                 .collect(Collectors.toList()));
-    if (deviceSchemaInfo != null) {
+    List<Integer> recheckedIndexOfMissingMeasurements;
+    if (deviceSchemaInfo == null) {
+      recheckedIndexOfMissingMeasurements = indexOfMissingMeasurements;
+    } else {
+      recheckedIndexOfMissingMeasurements = new 
ArrayList<>(indexOfMissingMeasurements.size());
       List<MeasurementSchema> schemaList = 
deviceSchemaInfo.getMeasurementSchemaList();
-      int removedCount = 0;
       for (int i = 0, size = schemaList.size(); i < size; i++) {
-        if (schemaList.get(i) != null) {
-          indexOfMissingMeasurements.remove(i - removedCount);
-          removedCount++;
+        if (schemaList.get(i) == null) {
+          
recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i));
         }
       }
     }
-    if (indexOfMissingMeasurements.isEmpty()) {
-      return;
-    }
-
-    // check whether there is template should be activated
-    Pair<Template, PartialPath> templateInfo = 
templateManager.checkTemplateSetInfo(devicePath);
-    if (templateInfo != null) {
-      Template template = templateInfo.left;
-      boolean shouldActivateTemplate = false;
-      for (int index : indexOfMissingMeasurements) {
-        if (template.hasSchema(measurements[index])) {
-          shouldActivateTemplate = true;
-          break;
-        }
-      }
-
-      if (shouldActivateTemplate) {
-        autoCreateSchemaExecutor.internalActivateTemplate(devicePath);
-        List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
-        for (int i = 0; i < indexOfMissingMeasurements.size(); i++) {
-          if (!template.hasSchema(measurements[i])) {
-            
recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i));
-          }
-        }
-        indexOfMissingMeasurements = recheckedIndexOfMissingMeasurements;
-        for (Map.Entry<String, IMeasurementSchema> entry : 
template.getSchemaMap().entrySet()) {
-          schemaTree.appendSingleMeasurement(
-              devicePath.concatNode(entry.getKey()),
-              (MeasurementSchema) entry.getValue(),
-              null,
-              null,
-              template.isDirectAligned());
-        }
-
-        if (indexOfMissingMeasurements.isEmpty()) {
-          return;
-        }
-      }
-    }
-
-    // auto create the rest missing timeseries
-    List<String> missingMeasurements = new 
ArrayList<>(indexOfMissingMeasurements.size());
-    List<TSDataType> dataTypesOfMissingMeasurement =
-        new ArrayList<>(indexOfMissingMeasurements.size());
-    List<TSEncoding> encodingsOfMissingMeasurement =
-        new ArrayList<>(indexOfMissingMeasurements.size());
-    List<CompressionType> compressionTypesOfMissingMeasurement =
-        new ArrayList<>(indexOfMissingMeasurements.size());
-    indexOfMissingMeasurements.forEach(
-        index -> {
-          TSDataType tsDataType = getDataType.apply(index);
-          // tsDataType == null means insert null value to a non-exist series
-          // should skip creating them
-          if (tsDataType != null) {
-            missingMeasurements.add(measurements[index]);
-            dataTypesOfMissingMeasurement.add(tsDataType);
-            encodingsOfMissingMeasurement.add(
-                encodings == null ? getDefaultEncoding(tsDataType) : 
encodings[index]);
-            compressionTypesOfMissingMeasurement.add(
-                compressionTypes == null
-                    ? 
TSFileDescriptor.getInstance().getConfig().getCompressor()
-                    : compressionTypes[index]);
-          }
-        });
-
-    if (!missingMeasurements.isEmpty()) {
-      schemaTree.mergeSchemaTree(
-          autoCreateSchemaExecutor.internalCreateTimeseries(
-              devicePath,
-              missingMeasurements,
-              dataTypesOfMissingMeasurement,
-              encodingsOfMissingMeasurement,
-              compressionTypesOfMissingMeasurement,
-              isAligned));
+    if (!recheckedIndexOfMissingMeasurements.isEmpty()) {
+      autoCreateSchemaExecutor.autoCreateSchema(
+          schemaTree,
+          devicePath,
+          recheckedIndexOfMissingMeasurements,
+          measurements,
+          getDataType,
+          encodings,
+          compressionTypes,
+          isAligned);
     }
   }
 

Reply via email to