This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch concurrent_schema_fetch in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0ebd32b922e10130f62b14c1d4bd41aba0a9fce7 Author: Marccos <[email protected]> AuthorDate: Wed Dec 14 20:19:48 2022 +0800 save trial --- .../analyze/schema/ClusterSchemaFetchExecutor.java | 50 +++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java index 2008dadaeb..7f25b2f640 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -48,6 +48,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; @@ -79,7 +81,6 @@ class ClusterSchemaFetchExecutor { } ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements) { - final AtomicBoolean shouldWait = new AtomicBoolean(true); executingTaskMap.compute( devicePath, @@ -204,6 +205,53 @@ class ClusterSchemaFetchExecutor { } } + private class DeviceSchemaFetchTaskExecutor { + + private volatile DeviceSchemaFetchTask waitingTask; + + private volatile DeviceSchemaFetchTask executingTask; + + private volatile int restThreadNum; + + private volatile ClusterSchemaTree fetchedResult; + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + + private ClusterSchemaTree execute(List<String> measurements) { + boolean needNewTask = false; + readWriteLock.readLock().lock(); + try { + if (executingTask != null) { + needNewTask = !executingTask.checkAndAddWaitingThread(measurements); + } + if (needNewTask) { + DeviceSchemaFetchTask task = waitingTask; + synchronized (this) { + readWriteLock.readLock().unlock(); + readWriteLock.writeLock().lock(); + try { + if (waitingTask == null) { + waitingTask = new DeviceSchemaFetchTask(); + task = waitingTask; + } + waitingTask.addWaitingThread(measurements); + } finally { + readWriteLock.writeLock().unlock(); + } + } + synchronized (task) { + } + } + } finally { + readWriteLock.readLock().unlock(); + } + + if (!executingTask.checkAndAddWaitingThread(measurements)) {} + + return null; + } + } + private static class DeviceSchemaFetchTask { private int waitingThreadNum = 0;
