This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/fix_some_bugs in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0e86f534151197796d596b6f7253880e011b1c54 Author: Jinrui.Zhang <[email protected]> AuthorDate: Thu Apr 28 22:43:20 2022 +0800 fix some bugs when debug --- .../main/java/org/apache/iotdb/confignode/manager/ConfigManager.java | 5 ++--- .../main/java/org/apache/iotdb/commons/partition/DataPartition.java | 2 +- .../java/org/apache/iotdb/commons/partition/SchemaPartition.java | 2 +- .../java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java | 2 +- .../db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java | 3 +++ server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java | 2 +- .../org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java | 4 ++-- .../org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java | 1 + .../iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java | 5 ----- .../org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java | 2 +- 10 files changed, 13 insertions(+), 15 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 94ac7c68c3..c8b55e1bdc 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -198,7 +198,6 @@ public class ConfigManager implements Manager { if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { List<String> devicePaths = patternTree.findAllDevicePaths(); List<String> storageGroups = getClusterSchemaManager().getStorageGroupNames(); - GetSchemaPartitionReq getSchemaPartitionReq = new GetSchemaPartitionReq(); Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>(); @@ -207,7 +206,7 @@ public class ConfigManager implements Manager { for (String devicePath : devicePaths) { boolean matchStorageGroup = false; for (String storageGroup : storageGroups) { - if (devicePath.contains(storageGroup)) { + if (devicePath.startsWith(storageGroup + ".")) { matchStorageGroup = true; if (devicePath.contains("*")) { // Get all SchemaPartitions of this StorageGroup if the devicePath contains "*" @@ -263,7 +262,7 @@ public class ConfigManager implements Manager { if (!devicePath.contains("*")) { // Only check devicePaths that without "*" for (String storageGroup : storageGroups) { - if (devicePath.contains(storageGroup)) { + if (devicePath.startsWith(storageGroup + ".")) { partitionSlotsMap .computeIfAbsent(storageGroup, key -> new ArrayList<>()) .add(getPartitionManager().getSeriesPartitionSlot(devicePath)); diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java index 5156ba47af..8c150d0ba4 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java @@ -104,7 +104,7 @@ public class DataPartition extends Partition { private String getStorageGroupByDevice(String deviceName) { for (String storageGroup : dataPartitionMap.keySet()) { - if (deviceName.startsWith(storageGroup)) { + if (deviceName.startsWith(storageGroup + ".")) { return storageGroup; } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java index a93048ff83..2c8e415199 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java @@ -63,7 +63,7 @@ public class SchemaPartition extends Partition { private String getStorageGroupByDevice(String deviceName) { for (String storageGroup : schemaPartitionMap.keySet()) { - if (deviceName.startsWith(storageGroup)) { + if (deviceName.startsWith(storageGroup + ".")) { return storageGroup; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java index 2bf2d8e72a..bc5b1c8994 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java @@ -228,7 +228,7 @@ public class SchemaTree { */ public String getBelongedStorageGroup(PartialPath path) { for (String storageGroup : storageGroups) { - if (path.getFullPath().startsWith(storageGroup)) { + if (path.getFullPath().startsWith(storageGroup + ".")) { return storageGroup; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java index 245dbcc435..6bf7fad34a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java @@ -84,6 +84,9 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher { client.close(); } throw e; + } catch (Exception e) { + LOGGER.error("unexpected exception", e); + throw e; } finally { if (client != null) { client.returnSelf(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java index 14950ed94c..e7fb2e2b5c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java @@ -141,7 +141,7 @@ public class MemoryPool { * return 0. */ public synchronized long tryCancel(ListenableFuture<Void> future) { - if (future.isDone()) { + if (future == null || future.isDone()) { return 0L; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java index b6426dca9e..1d1e0ca64a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java @@ -432,7 +432,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { for (String devicePath : devicePaths) { boolean hit = false; for (String storageGroup : storageGroupCache) { - if (devicePath.startsWith(storageGroup)) { + if (devicePath.startsWith(storageGroup + ".")) { deviceToStorageGroupMap.put(devicePath, storageGroup); hit = true; break; @@ -558,7 +558,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { if (!device.contains("*")) { String storageGroup = null; for (String storageGroupName : storageGroupNames) { - if (device.startsWith(storageGroupName)) { + if (device.startsWith(storageGroupName + ".")) { storageGroup = storageGroupName; break; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java index 3c9329c469..866d7ce0d4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java @@ -536,6 +536,7 @@ public class DistributionPlanner { FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId()); sinkNode.setChild(exchangeNode.getChild()); sinkNode.setDownStreamPlanNodeId(exchangeNode.getPlanNodeId()); + // Record the source node info in the ExchangeNode so that we can keep the connection of // these nodes/fragments exchangeNode.setRemoteSourceNode(sinkNode); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java index 09ddd063ed..ae97ffff0e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java @@ -22,7 +22,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.header.ColumnHeader; -import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType; @@ -110,8 +109,6 @@ public class ExchangeNode extends PlanNode { } public static ExchangeNode deserialize(ByteBuffer byteBuffer) { - FragmentSinkNode fragmentSinkNode = - (FragmentSinkNode) PlanFragment.deserializeHelper(byteBuffer); TEndPoint endPoint = new TEndPoint( ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer)); @@ -120,14 +117,12 @@ public class ExchangeNode extends PlanNode { PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); ExchangeNode exchangeNode = new ExchangeNode(planNodeId); exchangeNode.setUpstream(endPoint, fragmentInstanceId, upstreamPlanNodeId); - exchangeNode.setRemoteSourceNode(fragmentSinkNode); return exchangeNode; } @Override protected void serializeAttributes(ByteBuffer byteBuffer) { PlanNodeType.EXCHANGE.serialize(byteBuffer); - remoteSourceNode.serialize(byteBuffer); ReadWriteIOUtils.write(upstreamEndpoint.getIp(), byteBuffer); ReadWriteIOUtils.write(upstreamEndpoint.getPort(), byteBuffer); upstreamInstanceId.serialize(byteBuffer); diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java index 493b8466a2..86810c8372 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java @@ -137,7 +137,7 @@ public class InternalServiceImpl implements InternalService.Iface { // TODO need to be implemented and currently in order not to print NotImplementedException log, // we simply return null - return null; + return new TCancelResp(true); // throw new NotImplementedException(); }
