This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch concurrent_schema_fetch in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b5f5b028b5a0987032a306b2a3c4ab0f57bc5337 Author: Marccos <[email protected]> AuthorDate: Wed Dec 14 15:32:02 2022 +0800 save trial --- .../analyze/schema/ClusterSchemaFetchExecutor.java | 73 +++++++++++++++++++++- .../plan/analyze/schema/ClusterSchemaFetcher.java | 9 +-- 2 files changed, 77 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java index 9b0d3d7c81..75fd5623ac 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.plan.analyze.schema; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PathPatternTree; +import org.apache.iotdb.db.metadata.template.Template; import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree; import org.apache.iotdb.db.mpp.plan.Coordinator; import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; @@ -31,15 +34,22 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.Supplier; class ClusterSchemaFetchExecutor { @@ -47,14 +57,75 @@ class ClusterSchemaFetchExecutor { private final Coordinator coordinator; private final Supplier<Long> queryIdProvider; private final BiFunction<Long, Statement, ExecutionResult> statementExecutor; + private final Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider; + + private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap = new ConcurrentHashMap<>(); + + private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> executingTaskMap = new ConcurrentHashMap<>(); + + private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> waitingTaskMap = new ConcurrentHashMap<>(); ClusterSchemaFetchExecutor( Coordinator coordinator, Supplier<Long> queryIdProvider, - BiFunction<Long, Statement, ExecutionResult> statementExecutor) { + BiFunction<Long, Statement, ExecutionResult> statementExecutor, + Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider) { this.coordinator = coordinator; this.queryIdProvider = queryIdProvider; this.statementExecutor = statementExecutor; + this.templateSetInfoProvider = templateSetInfoProvider; + } + + ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements){ + + final AtomicBoolean shouldWait = new AtomicBoolean(true); + executingTaskMap.compute(devicePath, (key, value) -> { + if (value == null){ + return null; + } + if (value.right.size() < measurements.size()){ + shouldWait.set(false); + return value; + } + for (String measurement: measurements){ + if (!value.right.contains(measurement)){ + shouldWait.set(false); + return value; + } + } + value.left.getAndIncrement(); + return value; + }); + if (shouldWait.get()){ + while (!fetchedResultMap.containsKey(devicePath)); + ClusterSchemaTree schemaTree = new ClusterSchemaTree(); + Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath); + if (pair.left.decrementAndGet() == 0){ + fetchedResultMap.remove(devicePath); + } + }else { + Pair<AtomicInteger, Set<String>> task = waitingTaskMap.compute(devicePath, (key, value) -> { + if (value == null){ + value = new Pair<>(new AtomicInteger(0), new HashSet<>()); + } + value.left.getAndIncrement(); + value.right.addAll(measurements); + return value; + }); + synchronized (task){ + while (executingTaskMap.computeIfAbsent(devicePath, key -> task)!=task); + PathPatternTree patternTree = new PathPatternTree(); + for (String measurement: task.right){ + patternTree.appendFullPath(devicePath, measurement); + } + ClusterSchemaTree fetchedSchemaTree = executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateSetInfoProvider.apply(devicePath), false)); + Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult = new Pair<>(task.left, fetchedSchemaTree); + while (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult) != fetchSchemaResult); + + } + } + + return null; } ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java index 4d5a7e498d..564b97bfba 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java @@ -83,7 +83,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { "", ClusterPartitionFetcher.getInstance(), this, - config.getQueryTimeoutThreshold())); + config.getQueryTimeoutThreshold()), + templateManager::checkAllRelatedTemplate); private static final class ClusterSchemaFetcherHolder { private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher(); @@ -119,7 +120,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { if (withTags) { return clusterSchemaFetchExecutor.executeSchemaFetchQuery( - new SchemaFetchStatement(patternTree, templateMap, withTags)); + new SchemaFetchStatement(patternTree, templateMap, true)); } List<PartialPath> fullPathList = new ArrayList<>(); @@ -131,7 +132,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { if (fullPathList.isEmpty()) { return clusterSchemaFetchExecutor.executeSchemaFetchQuery( - new SchemaFetchStatement(patternTree, templateMap, withTags)); + new SchemaFetchStatement(patternTree, templateMap, false)); } // The schema cache R/W and fetch operation must be locked together thus the cache clean @@ -165,7 +166,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { schemaTree = clusterSchemaFetchExecutor.executeSchemaFetchQuery( - new SchemaFetchStatement(patternTree, templateMap, withTags)); + new SchemaFetchStatement(patternTree, templateMap, false)); // only cache the schema fetched by full path List<MeasurementPath> measurementPathList;
