This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch jira1865_master in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5cb6baf27fed1a4f740ddf2176db99206148c26a Author: LebronAl <[email protected]> AuthorDate: Fri Nov 12 09:52:24 2021 +0800 fix --- .../iotdb/cluster/query/reader/DataSourceInfo.java | 5 +-- .../query/reader/mult/MultDataSourceInfo.java | 5 +-- .../iotdb/db/query/control/QueryFileManager.java | 36 ++++++++++++---------- 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java index 02be077..57eb9d8 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java @@ -95,8 +95,6 @@ public class DataSourceInfo { if (newReaderId != null) { logger.debug("get a readerId {} for {} from {}", newReaderId, request.path, node); if (newReaderId != -1) { - // register the node so the remote resources can be released - context.registerRemoteNode(node, partitionGroup.getHeader()); this.readerId = newReaderId; this.curSource = node; this.curPos = nextNodePos; @@ -114,6 +112,9 @@ public class DataSourceInfo { logger.error("Cannot query {} from {}", this.request.path, node, e); } catch (Exception e) { logger.error("Cannot query {} from {}", this.request.path, node, e); + } finally { + // register the node so the remote resources can be released + context.registerRemoteNode(node, partitionGroup.getHeader()); } nextNodePos = (nextNodePos + 1) % this.nodes.size(); if (nextNodePos == this.curPos) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java index e849f04..1521ba3 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java @@ -100,8 +100,6 @@ public class MultDataSourceInfo { if (newReaderId != null) { logger.debug("get a readerId {} for {} from {}", newReaderId, request.path, node); if (newReaderId != -1) { - // register the node so the remote resources can be released - context.registerRemoteNode(node, partitionGroup.getHeader()); this.readerId = newReaderId; this.curSource = node; this.curPos = nextNodePos; @@ -119,6 +117,9 @@ public class MultDataSourceInfo { logger.error("Cannot query {} from {}", this.request.path, node, e); } catch (Exception e) { logger.error("Cannot query {} from {}", this.request.path, node, e); + } finally { + // register the node so the remote resources can be released + context.registerRemoteNode(node, partitionGroup.getHeader()); } nextNodePos = (nextNodePos + 1) % this.nodes.size(); if (nextNodePos == this.curPos) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java index d75bbe3..70ee622 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java @@ -21,11 +21,9 @@ package org.apache.iotdb.db.query.control; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** @@ -36,9 +34,9 @@ import java.util.concurrent.ConcurrentHashMap; public class QueryFileManager { /** Map<queryId, Set<filePaths>> */ - private Map<Long, Set<TsFileResource>> sealedFilePathsMap; + private Map<Long, Map<TsFileResource, TsFileResource>> sealedFilePathsMap; - private Map<Long, Set<TsFileResource>> unsealedFilePathsMap; + private Map<Long, Map<TsFileResource, TsFileResource>> unsealedFilePathsMap; QueryFileManager() { sealedFilePathsMap = new ConcurrentHashMap<>(); @@ -50,8 +48,8 @@ public class QueryFileManager { * must be invoked. */ void addQueryId(long queryId) { - sealedFilePathsMap.computeIfAbsent(queryId, x -> new HashSet<>()); - unsealedFilePathsMap.computeIfAbsent(queryId, x -> new HashSet<>()); + sealedFilePathsMap.computeIfAbsent(queryId, x -> new ConcurrentHashMap<>()); + unsealedFilePathsMap.computeIfAbsent(queryId, x -> new ConcurrentHashMap<>()); } /** Add the unique file paths to sealedFilePathsMap and unsealedFilePathsMap. */ @@ -73,10 +71,10 @@ public class QueryFileManager { // this file may be deleted just before we lock it if (tsFileResource.isDeleted()) { - Map<Long, Set<TsFileResource>> pathMap = + Map<Long, Map<TsFileResource, TsFileResource>> pathMap = !isClosed ? unsealedFilePathsMap : sealedFilePathsMap; // This resource may be removed by other threads of this query. - if (pathMap.get(queryId).remove(tsFileResource)) { + if (pathMap.get(queryId).remove(tsFileResource) != null) { FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed); } iterator.remove(); @@ -93,7 +91,7 @@ public class QueryFileManager { sealedFilePathsMap.computeIfPresent( queryId, (k, v) -> { - for (TsFileResource tsFile : v) { + for (TsFileResource tsFile : v.keySet()) { FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true); } return null; @@ -101,7 +99,7 @@ public class QueryFileManager { unsealedFilePathsMap.computeIfPresent( queryId, (k, v) -> { - for (TsFileResource tsFile : v) { + for (TsFileResource tsFile : v.keySet()) { FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, false); } return null; @@ -115,11 +113,17 @@ public class QueryFileManager { * not return null. */ void addFilePathToMap(long queryId, TsFileResource tsFile, boolean isClosed) { - Map<Long, Set<TsFileResource>> pathMap = isClosed ? sealedFilePathsMap : unsealedFilePathsMap; - // TODO this is not an atomic operation, is there concurrent problem? - if (!pathMap.get(queryId).contains(tsFile)) { - pathMap.get(queryId).add(tsFile); - FileReaderManager.getInstance().increaseFileReaderReference(tsFile, isClosed); - } + Map<Long, Map<TsFileResource, TsFileResource>> pathMap = + isClosed ? sealedFilePathsMap : unsealedFilePathsMap; + // Although there are no concurrency issues here at the moment, I've implemented thread-safe + // code here to avoid leaving holes for future newcomers. + pathMap + .get(queryId) + .computeIfAbsent( + tsFile, + k -> { + FileReaderManager.getInstance().increaseFileReaderReference(tsFile, isClosed); + return k; + }); } }
