This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch concurrent_schema_fetch in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0c5416ecfa36a7b1c69e12cf8c23fcd2605ca590 Author: Marccos <[email protected]> AuthorDate: Thu Dec 15 17:39:46 2022 +0800 fix bug --- .../db/mpp/common/schematree/ClusterSchemaTree.java | 14 +++++++++----- .../analyze/schema/ClusterSchemaFetchExecutor.java | 20 +++++++++----------- .../common/schematree/ClusterSchemaTreeTest.java | 21 +++++++++++++++++++++ 3 files changed, 39 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java index 9556086fab..fc0243e607 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java @@ -39,6 +39,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; import java.util.Deque; import java.util.List; import java.util.Map; @@ -327,10 +328,10 @@ public class ClusterSchemaTree implements ISchemaTree { } public ClusterSchemaTree extractDeviceSubTree(PartialPath devicePath, List<String> measurements) { - SchemaNode root = new SchemaInternalNode(PATH_ROOT); + SchemaNode clonedRoot = new SchemaInternalNode(PATH_ROOT); String[] nodes = devicePath.getNodes(); - SchemaNode cur = root; - SchemaNode clonedCur = root; + SchemaNode cur = this.root; + SchemaNode clonedCur = clonedRoot; SchemaNode clonedChild; for (int i = 1; i < nodes.length - 1; i++) { cur = cur.getChild(nodes[i]); @@ -350,12 +351,13 @@ public class ClusterSchemaTree implements ISchemaTree { SchemaEntityNode deviceNode = cur.getAsEntityNode(); SchemaEntityNode clonedDeviceNode = new SchemaEntityNode(nodes[nodes.length - 1]); + clonedCur.addChild(nodes[nodes.length - 1], clonedDeviceNode); clonedDeviceNode.setAligned(deviceNode.isAligned()); SchemaNode child; SchemaMeasurementNode measurementNode; for (String measurement : measurements) { child = deviceNode.getChild(measurement); - if (child.isMeasurement()) { + if (child != null && child.isMeasurement()) { measurementNode = child.getAsMeasurementNode(); clonedDeviceNode.addChild(measurementNode.getName(), measurementNode); if (measurementNode.getAlias() != null) { @@ -363,6 +365,8 @@ public class ClusterSchemaTree implements ISchemaTree { } } } - return new ClusterSchemaTree(root); + ClusterSchemaTree result = new ClusterSchemaTree(clonedRoot); + result.setDatabases(Collections.singleton(getBelongedDatabase(devicePath))); + return result; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java index a76f1a23c0..cee5775039 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -39,6 +39,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -80,15 +81,18 @@ class ClusterSchemaFetchExecutor { (key, value) -> { if (value == null) { value = new DeviceSchemaFetchTaskExecutor(); - value.incReferenceCount(); } + value.incReferenceCount(); return value; }) .execute(devicePath, measurements); deviceSchemaFetchTaskExecutorMap.compute( devicePath, (key, value) -> { - if (value == null || value.decAndGetReferenceCount() == 0) { + if (value == null) { + throw new IllegalStateException(); + } + if (value.decAndGetReferenceCount() == 0) { return null; } return value; @@ -235,23 +239,17 @@ class ClusterSchemaFetchExecutor { private static class DeviceSchemaFetchTask { - private final Set<String> measurementSet = new HashSet<>(); + private final Set<String> measurementSet = Collections.synchronizedSet(new HashSet<>()); private volatile boolean hasSubmitThread = false; - private volatile ClusterSchemaTree taskResult; + private volatile ClusterSchemaTree taskResult = null; private boolean canCoverRequest(List<String> measurements) { if (measurementSet.size() < measurements.size()) { return false; } - for (String measurement : measurements) { - if (!measurementSet.contains(measurement)) { - return false; - } - } - - return true; + return measurementSet.containsAll(measurements); } private void addRequest(List<String> measurements) { diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTreeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTreeTest.java index 000bdc10de..0dec1a056f 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTreeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTreeTest.java @@ -37,6 +37,7 @@ import org.mockito.internal.util.collections.Sets; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -592,4 +593,24 @@ public class ClusterSchemaTreeTest { .searchDeviceSchemaInfo(new PartialPath("root.sg.d1"), Collections.singletonList("s1")) .isAligned()); } + + @Test + public void testExtractDeviceSubTree() throws Exception { + SchemaNode root = generateSchemaTree(); + ClusterSchemaTree schemaTree = new ClusterSchemaTree(root); + schemaTree.setDatabases(Collections.singleton("root.sg")); + ClusterSchemaTree subTree = + schemaTree.extractDeviceSubTree( + new PartialPath("root.sg.d1"), Arrays.asList("s1", "status")); + Assert.assertEquals( + 2, + subTree + .searchDeviceSchemaInfo(new PartialPath("root.sg.d1"), Arrays.asList("s1", "status")) + .getMeasurementSchemaList() + .size()); + ClusterSchemaTree emptyTree = + schemaTree.extractDeviceSubTree( + new PartialPath("root.sg.d"), Collections.singletonList("s1")); + Assert.assertTrue(emptyTree.isEmpty()); + } }
