This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0c5416ecfa36a7b1c69e12cf8c23fcd2605ca590
Author: Marccos <[email protected]>
AuthorDate: Thu Dec 15 17:39:46 2022 +0800

    fix bug
---
 .../db/mpp/common/schematree/ClusterSchemaTree.java | 14 +++++++++-----
 .../analyze/schema/ClusterSchemaFetchExecutor.java  | 20 +++++++++-----------
 .../common/schematree/ClusterSchemaTreeTest.java    | 21 +++++++++++++++++++++
 3 files changed, 39 insertions(+), 16 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
index 9556086fab..fc0243e607 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
@@ -39,6 +39,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
 import java.util.Map;
@@ -327,10 +328,10 @@ public class ClusterSchemaTree implements ISchemaTree {
   }
 
   public ClusterSchemaTree extractDeviceSubTree(PartialPath devicePath, 
List<String> measurements) {
-    SchemaNode root = new SchemaInternalNode(PATH_ROOT);
+    SchemaNode clonedRoot = new SchemaInternalNode(PATH_ROOT);
     String[] nodes = devicePath.getNodes();
-    SchemaNode cur = root;
-    SchemaNode clonedCur = root;
+    SchemaNode cur = this.root;
+    SchemaNode clonedCur = clonedRoot;
     SchemaNode clonedChild;
     for (int i = 1; i < nodes.length - 1; i++) {
       cur = cur.getChild(nodes[i]);
@@ -350,12 +351,13 @@ public class ClusterSchemaTree implements ISchemaTree {
 
     SchemaEntityNode deviceNode = cur.getAsEntityNode();
     SchemaEntityNode clonedDeviceNode = new 
SchemaEntityNode(nodes[nodes.length - 1]);
+    clonedCur.addChild(nodes[nodes.length - 1], clonedDeviceNode);
     clonedDeviceNode.setAligned(deviceNode.isAligned());
     SchemaNode child;
     SchemaMeasurementNode measurementNode;
     for (String measurement : measurements) {
       child = deviceNode.getChild(measurement);
-      if (child.isMeasurement()) {
+      if (child != null && child.isMeasurement()) {
         measurementNode = child.getAsMeasurementNode();
         clonedDeviceNode.addChild(measurementNode.getName(), measurementNode);
         if (measurementNode.getAlias() != null) {
@@ -363,6 +365,8 @@ public class ClusterSchemaTree implements ISchemaTree {
         }
       }
     }
-    return new ClusterSchemaTree(root);
+    ClusterSchemaTree result = new ClusterSchemaTree(clonedRoot);
+    
result.setDatabases(Collections.singleton(getBelongedDatabase(devicePath)));
+    return result;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index a76f1a23c0..cee5775039 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -80,15 +81,18 @@ class ClusterSchemaFetchExecutor {
                 (key, value) -> {
                   if (value == null) {
                     value = new DeviceSchemaFetchTaskExecutor();
-                    value.incReferenceCount();
                   }
+                  value.incReferenceCount();
                   return value;
                 })
             .execute(devicePath, measurements);
     deviceSchemaFetchTaskExecutorMap.compute(
         devicePath,
         (key, value) -> {
-          if (value == null || value.decAndGetReferenceCount() == 0) {
+          if (value == null) {
+            throw new IllegalStateException();
+          }
+          if (value.decAndGetReferenceCount() == 0) {
             return null;
           }
           return value;
@@ -235,23 +239,17 @@ class ClusterSchemaFetchExecutor {
 
   private static class DeviceSchemaFetchTask {
 
-    private final Set<String> measurementSet = new HashSet<>();
+    private final Set<String> measurementSet = Collections.synchronizedSet(new 
HashSet<>());
 
     private volatile boolean hasSubmitThread = false;
 
-    private volatile ClusterSchemaTree taskResult;
+    private volatile ClusterSchemaTree taskResult = null;
 
     private boolean canCoverRequest(List<String> measurements) {
       if (measurementSet.size() < measurements.size()) {
         return false;
       }
-      for (String measurement : measurements) {
-        if (!measurementSet.contains(measurement)) {
-          return false;
-        }
-      }
-
-      return true;
+      return measurementSet.containsAll(measurements);
     }
 
     private void addRequest(List<String> measurements) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTreeTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTreeTest.java
index 000bdc10de..0dec1a056f 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTreeTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTreeTest.java
@@ -37,6 +37,7 @@ import org.mockito.internal.util.collections.Sets;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -592,4 +593,24 @@ public class ClusterSchemaTreeTest {
             .searchDeviceSchemaInfo(new PartialPath("root.sg.d1"), 
Collections.singletonList("s1"))
             .isAligned());
   }
+
+  @Test
+  public void testExtractDeviceSubTree() throws Exception {
+    SchemaNode root = generateSchemaTree();
+    ClusterSchemaTree schemaTree = new ClusterSchemaTree(root);
+    schemaTree.setDatabases(Collections.singleton("root.sg"));
+    ClusterSchemaTree subTree =
+        schemaTree.extractDeviceSubTree(
+            new PartialPath("root.sg.d1"), Arrays.asList("s1", "status"));
+    Assert.assertEquals(
+        2,
+        subTree
+            .searchDeviceSchemaInfo(new PartialPath("root.sg.d1"), 
Arrays.asList("s1", "status"))
+            .getMeasurementSchemaList()
+            .size());
+    ClusterSchemaTree emptyTree =
+        schemaTree.extractDeviceSubTree(
+            new PartialPath("root.sg.d"), Collections.singletonList("s1"));
+    Assert.assertTrue(emptyTree.isEmpty());
+  }
 }

Reply via email to