This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch concurrent_schema_fetch in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2b069950bc07c51c50952bb2b63d610505cac49a Author: Marccos <[email protected]> AuthorDate: Wed Dec 14 16:09:43 2022 +0800 save trial --- .../analyze/schema/ClusterSchemaFetchExecutor.java | 128 ++++++++++++++------- .../plan/analyze/schema/ClusterSchemaFetcher.java | 2 +- 2 files changed, 86 insertions(+), 44 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java index 75fd5623ac..2008dadaeb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -59,11 +59,13 @@ class ClusterSchemaFetchExecutor { private final BiFunction<Long, Statement, ExecutionResult> statementExecutor; private final Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider; - private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap = new ConcurrentHashMap<>(); + private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap = + new ConcurrentHashMap<>(); - private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> executingTaskMap = new ConcurrentHashMap<>(); + private final Map<PartialPath, DeviceSchemaFetchTask> executingTaskMap = + new ConcurrentHashMap<>(); - private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> waitingTaskMap = new ConcurrentHashMap<>(); + private final Map<PartialPath, DeviceSchemaFetchTask> waitingTaskMap = new ConcurrentHashMap<>(); ClusterSchemaFetchExecutor( Coordinator coordinator, @@ -76,54 +78,69 @@ class ClusterSchemaFetchExecutor { this.templateSetInfoProvider = templateSetInfoProvider; } - ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements){ + ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements) { final AtomicBoolean shouldWait = new AtomicBoolean(true); - executingTaskMap.compute(devicePath, (key, value) -> { - if (value == null){ - return null; - } - if (value.right.size() < measurements.size()){ - shouldWait.set(false); - return value; - } - for (String measurement: measurements){ - if (!value.right.contains(measurement)){ - shouldWait.set(false); + executingTaskMap.compute( + devicePath, + (key, value) -> { + if (value == null) { + return null; + } + shouldWait.set(value.checkAndAddWaitingThread(measurements)); return value; + }); + + if (!shouldWait.get()) { + DeviceSchemaFetchTask task = + waitingTaskMap.compute( + devicePath, + (key, value) -> { + if (value == null) { + value = new DeviceSchemaFetchTask(); + } + value.addWaitingThread(measurements); + return value; + }); + if (executingTaskMap.get(devicePath) != task) { + synchronized (task) { + if (executingTaskMap.get(devicePath) != task) { + while (true) { + if (executingTaskMap.computeIfAbsent(devicePath, key -> task) != task) { + break; + } + } + PathPatternTree patternTree = new PathPatternTree(); + for (String measurement : task.measurementSet) { + patternTree.appendFullPath(devicePath, measurement); + } + ClusterSchemaTree fetchedSchemaTree = + executeSchemaFetchQuery( + new SchemaFetchStatement( + patternTree, templateSetInfoProvider.apply(devicePath), false)); + Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult = + new Pair<>(new AtomicInteger(task.waitingThreadNum), fetchedSchemaTree); + while (true) { + if (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult) + != fetchSchemaResult) { + break; + } + } + } } } - value.left.getAndIncrement(); - return value; - }); - if (shouldWait.get()){ - while (!fetchedResultMap.containsKey(devicePath)); - ClusterSchemaTree schemaTree = new ClusterSchemaTree(); - Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath); - if (pair.left.decrementAndGet() == 0){ - fetchedResultMap.remove(devicePath); - } - }else { - Pair<AtomicInteger, Set<String>> task = waitingTaskMap.compute(devicePath, (key, value) -> { - if (value == null){ - value = new Pair<>(new AtomicInteger(0), new HashSet<>()); - } - value.left.getAndIncrement(); - value.right.addAll(measurements); - return value; - }); - synchronized (task){ - while (executingTaskMap.computeIfAbsent(devicePath, key -> task)!=task); - PathPatternTree patternTree = new PathPatternTree(); - for (String measurement: task.right){ - patternTree.appendFullPath(devicePath, measurement); - } - ClusterSchemaTree fetchedSchemaTree = executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateSetInfoProvider.apply(devicePath), false)); - Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult = new Pair<>(task.left, fetchedSchemaTree); - while (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult) != fetchSchemaResult); + } + while (true) { + if (!fetchedResultMap.containsKey(devicePath)) { + break; } } + ClusterSchemaTree schemaTree = new ClusterSchemaTree(); + Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath); + if (pair.left.decrementAndGet() == 0) { + fetchedResultMap.remove(devicePath); + } return null; } @@ -186,4 +203,29 @@ class ClusterSchemaFetchExecutor { // Totally memory operation. This case won't happen. } } + + private static class DeviceSchemaFetchTask { + + private int waitingThreadNum = 0; + + private final Set<String> measurementSet = new HashSet<>(); + + private boolean checkAndAddWaitingThread(List<String> measurements) { + if (measurementSet.size() < measurements.size()) { + return false; + } + for (String measurement : measurements) { + if (!measurementSet.contains(measurement)) { + return false; + } + } + waitingThreadNum++; + return true; + } + + private void addWaitingThread(List<String> measurements) { + waitingThreadNum++; + measurementSet.addAll(measurements); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java index 564b97bfba..52eabcdcc6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java @@ -84,7 +84,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { ClusterPartitionFetcher.getInstance(), this, config.getQueryTimeoutThreshold()), - templateManager::checkAllRelatedTemplate); + templateManager::checkAllRelatedTemplate); private static final class ClusterSchemaFetcherHolder { private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
