This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira1865_master
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5cb6baf27fed1a4f740ddf2176db99206148c26a
Author: LebronAl <[email protected]>
AuthorDate: Fri Nov 12 09:52:24 2021 +0800

    fix
---
 .../iotdb/cluster/query/reader/DataSourceInfo.java |  5 +--
 .../query/reader/mult/MultDataSourceInfo.java      |  5 +--
 .../iotdb/db/query/control/QueryFileManager.java   | 36 ++++++++++++----------
 3 files changed, 26 insertions(+), 20 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index 02be077..57eb9d8 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -95,8 +95,6 @@ public class DataSourceInfo {
         if (newReaderId != null) {
           logger.debug("get a readerId {} for {} from {}", newReaderId, 
request.path, node);
           if (newReaderId != -1) {
-            // register the node so the remote resources can be released
-            context.registerRemoteNode(node, partitionGroup.getHeader());
             this.readerId = newReaderId;
             this.curSource = node;
             this.curPos = nextNodePos;
@@ -114,6 +112,9 @@ public class DataSourceInfo {
         logger.error("Cannot query {} from {}", this.request.path, node, e);
       } catch (Exception e) {
         logger.error("Cannot query {} from {}", this.request.path, node, e);
+      } finally {
+        // register the node so the remote resources can be released
+        context.registerRemoteNode(node, partitionGroup.getHeader());
       }
       nextNodePos = (nextNodePos + 1) % this.nodes.size();
       if (nextNodePos == this.curPos) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
index e849f04..1521ba3 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
@@ -100,8 +100,6 @@ public class MultDataSourceInfo {
         if (newReaderId != null) {
           logger.debug("get a readerId {} for {} from {}", newReaderId, 
request.path, node);
           if (newReaderId != -1) {
-            // register the node so the remote resources can be released
-            context.registerRemoteNode(node, partitionGroup.getHeader());
             this.readerId = newReaderId;
             this.curSource = node;
             this.curPos = nextNodePos;
@@ -119,6 +117,9 @@ public class MultDataSourceInfo {
         logger.error("Cannot query {} from {}", this.request.path, node, e);
       } catch (Exception e) {
         logger.error("Cannot query {} from {}", this.request.path, node, e);
+      } finally {
+        // register the node so the remote resources can be released
+        context.registerRemoteNode(node, partitionGroup.getHeader());
       }
       nextNodePos = (nextNodePos + 1) % this.nodes.size();
       if (nextNodePos == this.curPos) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java 
b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
index d75bbe3..70ee622 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
@@ -21,11 +21,9 @@ package org.apache.iotdb.db.query.control;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -36,9 +34,9 @@ import java.util.concurrent.ConcurrentHashMap;
 public class QueryFileManager {
 
   /** Map<queryId, Set<filePaths>> */
-  private Map<Long, Set<TsFileResource>> sealedFilePathsMap;
+  private Map<Long, Map<TsFileResource, TsFileResource>> sealedFilePathsMap;
 
-  private Map<Long, Set<TsFileResource>> unsealedFilePathsMap;
+  private Map<Long, Map<TsFileResource, TsFileResource>> unsealedFilePathsMap;
 
   QueryFileManager() {
     sealedFilePathsMap = new ConcurrentHashMap<>();
@@ -50,8 +48,8 @@ public class QueryFileManager {
    * must be invoked.
    */
   void addQueryId(long queryId) {
-    sealedFilePathsMap.computeIfAbsent(queryId, x -> new HashSet<>());
-    unsealedFilePathsMap.computeIfAbsent(queryId, x -> new HashSet<>());
+    sealedFilePathsMap.computeIfAbsent(queryId, x -> new 
ConcurrentHashMap<>());
+    unsealedFilePathsMap.computeIfAbsent(queryId, x -> new 
ConcurrentHashMap<>());
   }
 
   /** Add the unique file paths to sealedFilePathsMap and 
unsealedFilePathsMap. */
@@ -73,10 +71,10 @@ public class QueryFileManager {
 
       // this file may be deleted just before we lock it
       if (tsFileResource.isDeleted()) {
-        Map<Long, Set<TsFileResource>> pathMap =
+        Map<Long, Map<TsFileResource, TsFileResource>> pathMap =
             !isClosed ? unsealedFilePathsMap : sealedFilePathsMap;
         // This resource may be removed by other threads of this query.
-        if (pathMap.get(queryId).remove(tsFileResource)) {
+        if (pathMap.get(queryId).remove(tsFileResource) != null) {
           
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, 
isClosed);
         }
         iterator.remove();
@@ -93,7 +91,7 @@ public class QueryFileManager {
     sealedFilePathsMap.computeIfPresent(
         queryId,
         (k, v) -> {
-          for (TsFileResource tsFile : v) {
+          for (TsFileResource tsFile : v.keySet()) {
             
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
           }
           return null;
@@ -101,7 +99,7 @@ public class QueryFileManager {
     unsealedFilePathsMap.computeIfPresent(
         queryId,
         (k, v) -> {
-          for (TsFileResource tsFile : v) {
+          for (TsFileResource tsFile : v.keySet()) {
             
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, false);
           }
           return null;
@@ -115,11 +113,17 @@ public class QueryFileManager {
    * not return null.
    */
   void addFilePathToMap(long queryId, TsFileResource tsFile, boolean isClosed) 
{
-    Map<Long, Set<TsFileResource>> pathMap = isClosed ? sealedFilePathsMap : 
unsealedFilePathsMap;
-    // TODO this is not an atomic operation, is there concurrent problem?
-    if (!pathMap.get(queryId).contains(tsFile)) {
-      pathMap.get(queryId).add(tsFile);
-      FileReaderManager.getInstance().increaseFileReaderReference(tsFile, 
isClosed);
-    }
+    Map<Long, Map<TsFileResource, TsFileResource>> pathMap =
+        isClosed ? sealedFilePathsMap : unsealedFilePathsMap;
+    // Although there are no concurrency issues here at the moment, I've 
implemented thread-safe
+    // code here to avoid leaving holes for future newcomers.
+    pathMap
+        .get(queryId)
+        .computeIfAbsent(
+            tsFile,
+            k -> {
+              
FileReaderManager.getInstance().increaseFileReaderReference(tsFile, isClosed);
+              return k;
+            });
   }
 }

Reply via email to