This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch concurrent_schema_fetch in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ac49e5c19cbfca2f753aa52ebcf9210db0c70532 Author: Marccos <[email protected]> AuthorDate: Thu Dec 15 14:42:38 2022 +0800 basically implement --- .../mpp/common/schematree/ClusterSchemaTree.java | 40 ++++ .../analyze/schema/ClusterSchemaFetchExecutor.java | 222 +++++++++++---------- .../plan/analyze/schema/ClusterSchemaFetcher.java | 27 ++- 3 files changed, 180 insertions(+), 109 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java index d1581176ca..9556086fab 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java @@ -325,4 +325,44 @@ public class ClusterSchemaTree implements ISchemaTree { public boolean isEmpty() { return root.getChildren() == null || root.getChildren().size() == 0; } + + public ClusterSchemaTree extractDeviceSubTree(PartialPath devicePath, List<String> measurements) { + SchemaNode root = new SchemaInternalNode(PATH_ROOT); + String[] nodes = devicePath.getNodes(); + SchemaNode cur = root; + SchemaNode clonedCur = root; + SchemaNode clonedChild; + for (int i = 1; i < nodes.length - 1; i++) { + cur = cur.getChild(nodes[i]); + if (cur == null) { + return new ClusterSchemaTree(); + } + + clonedChild = new SchemaInternalNode(cur.getName()); + clonedCur.addChild(nodes[i], clonedChild); + clonedCur = clonedChild; + } + + cur = cur.getChild(nodes[nodes.length - 1]); + if (cur == null || !cur.isEntity()) { + return new ClusterSchemaTree(); + } + + SchemaEntityNode deviceNode = cur.getAsEntityNode(); + SchemaEntityNode clonedDeviceNode = new SchemaEntityNode(nodes[nodes.length - 1]); + clonedDeviceNode.setAligned(deviceNode.isAligned()); + SchemaNode child; + SchemaMeasurementNode measurementNode; + for (String measurement : measurements) { + child = deviceNode.getChild(measurement); + if (child.isMeasurement()) { + measurementNode = child.getAsMeasurementNode(); + clonedDeviceNode.addChild(measurementNode.getName(), measurementNode); + if (measurementNode.getAlias() != null) { + clonedDeviceNode.addAliasChild(measurementNode.getAlias(), measurementNode); + } + } + } + return new ClusterSchemaTree(root); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java index 7f25b2f640..a76f1a23c0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -34,7 +34,6 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.utils.Binary; -import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.ByteArrayInputStream; @@ -46,8 +45,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiFunction; @@ -61,14 +58,9 @@ class ClusterSchemaFetchExecutor { private final BiFunction<Long, Statement, ExecutionResult> statementExecutor; private final Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider; - private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap = + private final Map<PartialPath, DeviceSchemaFetchTaskExecutor> deviceSchemaFetchTaskExecutorMap = new ConcurrentHashMap<>(); - private final Map<PartialPath, DeviceSchemaFetchTask> executingTaskMap = - new ConcurrentHashMap<>(); - - private final Map<PartialPath, DeviceSchemaFetchTask> waitingTaskMap = new ConcurrentHashMap<>(); - ClusterSchemaFetchExecutor( Coordinator coordinator, Supplier<Long> queryIdProvider, @@ -81,69 +73,27 @@ class ClusterSchemaFetchExecutor { } ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements) { - final AtomicBoolean shouldWait = new AtomicBoolean(true); - executingTaskMap.compute( + ClusterSchemaTree result = + deviceSchemaFetchTaskExecutorMap + .compute( + devicePath, + (key, value) -> { + if (value == null) { + value = new DeviceSchemaFetchTaskExecutor(); + value.incReferenceCount(); + } + return value; + }) + .execute(devicePath, measurements); + deviceSchemaFetchTaskExecutorMap.compute( devicePath, (key, value) -> { - if (value == null) { + if (value == null || value.decAndGetReferenceCount() == 0) { return null; } - shouldWait.set(value.checkAndAddWaitingThread(measurements)); return value; }); - - if (!shouldWait.get()) { - DeviceSchemaFetchTask task = - waitingTaskMap.compute( - devicePath, - (key, value) -> { - if (value == null) { - value = new DeviceSchemaFetchTask(); - } - value.addWaitingThread(measurements); - return value; - }); - if (executingTaskMap.get(devicePath) != task) { - synchronized (task) { - if (executingTaskMap.get(devicePath) != task) { - while (true) { - if (executingTaskMap.computeIfAbsent(devicePath, key -> task) != task) { - break; - } - } - PathPatternTree patternTree = new PathPatternTree(); - for (String measurement : task.measurementSet) { - patternTree.appendFullPath(devicePath, measurement); - } - ClusterSchemaTree fetchedSchemaTree = - executeSchemaFetchQuery( - new SchemaFetchStatement( - patternTree, templateSetInfoProvider.apply(devicePath), false)); - Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult = - new Pair<>(new AtomicInteger(task.waitingThreadNum), fetchedSchemaTree); - while (true) { - if (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult) - != fetchSchemaResult) { - break; - } - } - } - } - } - } - - while (true) { - if (!fetchedResultMap.containsKey(devicePath)) { - break; - } - } - ClusterSchemaTree schemaTree = new ClusterSchemaTree(); - Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath); - if (pair.left.decrementAndGet() == 0) { - fetchedResultMap.remove(devicePath); - } - - return null; + return result; } ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) { @@ -207,58 +157,91 @@ class ClusterSchemaFetchExecutor { private class DeviceSchemaFetchTaskExecutor { - private volatile DeviceSchemaFetchTask waitingTask; + // buffer all the requests, waiting for execution + private volatile DeviceSchemaFetchTask newTask; + // task on executing private volatile DeviceSchemaFetchTask executingTask; - private volatile int restThreadNum; - - private volatile ClusterSchemaTree fetchedResult; - + // used for concurrent control of newTask R/W operation private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private ClusterSchemaTree execute(List<String> measurements) { - boolean needNewTask = false; - readWriteLock.readLock().lock(); - try { - if (executingTask != null) { - needNewTask = !executingTask.checkAndAddWaitingThread(measurements); - } - if (needNewTask) { - DeviceSchemaFetchTask task = waitingTask; - synchronized (this) { - readWriteLock.readLock().unlock(); - readWriteLock.writeLock().lock(); - try { - if (waitingTask == null) { - waitingTask = new DeviceSchemaFetchTask(); - task = waitingTask; + // thread-safe, protected by deviceSchemaFetchTaskExecutorMap.compute + private int referenceCount = 0; + + private ClusterSchemaTree execute(PartialPath devicePath, List<String> measurements) { + DeviceSchemaFetchTask task = this.executingTask; + // check whether the executing task can cover this request, if so, just waiting for this task + if (task == null || !task.canCoverRequest(measurements)) { + readWriteLock.readLock().lock(); + try { + if ((task = this.newTask) == null) { + synchronized (this) { + if ((task = this.newTask) == null) { + task = this.newTask = new DeviceSchemaFetchTask(); } - waitingTask.addWaitingThread(measurements); - } finally { - readWriteLock.writeLock().unlock(); } } - synchronized (task) { + // this operation shall be blocked by task submitting operation + // since once the task has been submitted, the info new added requests will be invalid + task.addRequest(measurements); + } finally { + readWriteLock.readLock().unlock(); + } + if (!task.isSubmitting()) { + if (task.markSubmitting()) { + // only one thread will execute this block + while (true) { + // waiting for execution + if (this.executingTask == null) { + // block all request adding operation + readWriteLock.writeLock().lock(); + try { + // make this task as executing state for new request check + this.executingTask = task; + // make this field free for buffering new task + this.newTask = null; + break; + } finally { + readWriteLock.writeLock().unlock(); + } + } + } + // do execution and save fetched result + task.saveResult( + executeSchemaFetchQuery( + new SchemaFetchStatement( + task.generatePatternTree(devicePath), + templateSetInfoProvider.apply(devicePath), + false))); + // release this field for new task execution + this.executingTask = null; } } - } finally { - readWriteLock.readLock().unlock(); } - if (!executingTask.checkAndAddWaitingThread(measurements)) {} + // each request takes needed result from the fetched schema tree + return task.extractResult(devicePath, measurements); + } + + private void incReferenceCount() { + referenceCount++; + } - return null; + private int decAndGetReferenceCount() { + return --referenceCount; } } private static class DeviceSchemaFetchTask { - private int waitingThreadNum = 0; - private final Set<String> measurementSet = new HashSet<>(); - private boolean checkAndAddWaitingThread(List<String> measurements) { + private volatile boolean hasSubmitThread = false; + + private volatile ClusterSchemaTree taskResult; + + private boolean canCoverRequest(List<String> measurements) { if (measurementSet.size() < measurements.size()) { return false; } @@ -267,13 +250,50 @@ class ClusterSchemaFetchExecutor { return false; } } - waitingThreadNum++; + return true; } - private void addWaitingThread(List<String> measurements) { - waitingThreadNum++; + private void addRequest(List<String> measurements) { measurementSet.addAll(measurements); } + + private boolean isSubmitting() { + return hasSubmitThread; + } + + private boolean markSubmitting() { + if (hasSubmitThread) { + return false; + } + synchronized (this) { + if (hasSubmitThread) { + return false; + } + hasSubmitThread = true; + return true; + } + } + + private PathPatternTree generatePatternTree(PartialPath devicePath) { + PathPatternTree patternTree = new PathPatternTree(); + for (String measurement : measurementSet) { + patternTree.appendFullPath(devicePath, measurement); + } + return patternTree; + } + + private void saveResult(ClusterSchemaTree clusterSchemaTree) { + taskResult = clusterSchemaTree; + } + + private ClusterSchemaTree extractResult(PartialPath devicePath, List<String> measurements) { + while (true) { + if (taskResult != null) { + break; + } + } + return taskResult.extractDeviceSubTree(devicePath, measurements); + } } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java index 52eabcdcc6..6a80668a5a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java @@ -124,9 +124,13 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { } List<PartialPath> fullPathList = new ArrayList<>(); + Map<PartialPath, List<String>> deviceMap = new HashMap<>(); for (PartialPath pattern : pathPatternList) { if (!pattern.hasWildcard()) { fullPathList.add(pattern); + deviceMap + .computeIfAbsent(pattern.getDevicePath(), k -> new ArrayList<>()) + .add(pattern.getMeasurement()); } } @@ -164,9 +168,15 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { } } - schemaTree = - clusterSchemaFetchExecutor.executeSchemaFetchQuery( - new SchemaFetchStatement(patternTree, templateMap, false)); + if (deviceMap.size() == 1) { + Map.Entry<PartialPath, List<String>> entry = deviceMap.entrySet().iterator().next(); + schemaTree = + clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(entry.getKey(), entry.getValue()); + } else { + schemaTree = + clusterSchemaFetchExecutor.executeSchemaFetchQuery( + new SchemaFetchStatement(patternTree, templateMap, false)); + } // only cache the schema fetched by full path List<MeasurementPath> measurementPathList; @@ -204,11 +214,12 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { } // try fetch the missing schema from remote and cache fetched schema - PathPatternTree patternTree = new PathPatternTree(); - for (int index : indexOfMissingMeasurements) { - patternTree.appendFullPath(devicePath, measurements[index]); - } - ClusterSchemaTree remoteSchemaTree = fetchSchemaFromRemote(patternTree); + ClusterSchemaTree remoteSchemaTree = + clusterSchemaFetchExecutor.fetchSchemaOfOneDevice( + devicePath, + indexOfMissingMeasurements.stream() + .map(index -> measurements[index]) + .collect(Collectors.toList())); if (!remoteSchemaTree.isEmpty()) { schemaTree.mergeSchemaTree(remoteSchemaTree); schemaCache.put(remoteSchemaTree);
