This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch threadlocal_for_query
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 291f361bbc07a4688e025b0e31cd305dd7de0f10
Author: xiangdong huang <[email protected]>
AuthorDate: Fri Mar 29 01:01:56 2019 +0800

    refactor QueryTokenManager: move threadlocal as QuerySession
---
 .../db/query/control/OpenedFilePathsManager.java   | 18 ++----
 .../db/query/control/QueryDataSourceManager.java   |  2 +-
 .../iotdb/db/query/control/QuerySession.java       | 73 ++++++++++++++++++++++
 .../iotdb/db/query/control/QueryTokenManager.java  | 31 ++++-----
 .../iotdb/db/query/executor/EngineQueryRouter.java | 38 +++++------
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  7 +--
 .../db/integration/IoTDBEngineTimeGeneratorIT.java |  6 +-
 .../db/integration/IoTDBSequenceDataQueryIT.java   |  7 ++-
 .../iotdb/db/integration/IoTDBSeriesReaderIT.java  |  9 +--
 .../db/query/control/FileReaderManagerTest.java    |  6 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |  3 +-
 11 files changed, 124 insertions(+), 76 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
index e6ffba3..409b369 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
@@ -32,10 +32,7 @@ import 
org.apache.iotdb.db.engine.querycontext.QueryDataSource;
  */
 public class OpenedFilePathsManager {
 
-  /**
-   * Each jdbc request has an unique jod id, job id is stored in thread local 
variable jobIdContainer.
-   */
-  private ThreadLocal<Long> jobIdContainer;
+
 
   /**
    * Map<jobId, Set<filePaths>>
@@ -44,7 +41,6 @@ public class OpenedFilePathsManager {
   private ConcurrentHashMap<Long, Set<String>> unclosedFilePathsMap;
 
   private OpenedFilePathsManager() {
-    jobIdContainer = new ThreadLocal<>();
     closedFilePathsMap = new ConcurrentHashMap<>();
     unclosedFilePathsMap = new ConcurrentHashMap<>();
   }
@@ -56,8 +52,7 @@ public class OpenedFilePathsManager {
   /**
    * Set job id for current request thread. When a query request is created 
firstly, this method must be invoked.
    */
-  public void setJobIdForCurrentRequestThread(long jobId) {
-    jobIdContainer.set(jobId);
+  public void addJobId(long jobId) {
     closedFilePathsMap.put(jobId, new HashSet<>());
     unclosedFilePathsMap.put(jobId, new HashSet<>());
   }
@@ -65,7 +60,7 @@ public class OpenedFilePathsManager {
   /**
    * Add the unique file paths to closedFilePathsMap and unclosedFilePathsMap.
    */
-  void addUsedFilesForCurrentRequestThread(long jobId, QueryDataSource 
dataSource) {
+  public void addUsedFilesForGivenJob(long jobId, QueryDataSource dataSource) {
     for (TsFileResource tsFileResource : 
dataSource.getSeqDataSource().getSealedTsFiles()) {
       String sealedFilePath = tsFileResource.getFilePath();
       addFilePathToMap(jobId, sealedFilePath, true);
@@ -88,11 +83,7 @@ public class OpenedFilePathsManager {
    * Whenever the jdbc request is closed normally or abnormally, this method 
must be invoked. All file paths used by
    * this jdbc request must be cleared and thus the usage reference must be 
decreased.
    */
-  public void removeUsedFilesForCurrentRequestThread() {
-    if (jobIdContainer.get() != null) {
-      long jobId = jobIdContainer.get();
-      jobIdContainer.remove();
-
+  public void removeUsedFilesForGivenJob(long jobId) {
       for (String filePath : closedFilePathsMap.get(jobId)) {
         FileReaderManager.getInstance().decreaseFileReaderReference(filePath, 
false);
       }
@@ -101,7 +92,6 @@ public class OpenedFilePathsManager {
         FileReaderManager.getInstance().decreaseFileReaderReference(filePath, 
true);
       }
       unclosedFilePathsMap.remove(jobId);
-    }
   }
 
   /**
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
index f3fd1f3..add4e19 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
@@ -46,7 +46,7 @@ public class QueryDataSourceManager {
 
     // add used files to current thread request cached map
     OpenedFilePathsManager.getInstance()
-        .addUsedFilesForCurrentRequestThread(jobId, queryDataSource);
+        .addUsedFilesForGivenJob(jobId, queryDataSource);
 
     return queryDataSource;
   }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java
new file mode 100644
index 0000000..20415c6
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.control;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class QuerySession {
+  /**
+   * Each jdbc request has an unique jod id, job id is stored in thread local 
variable jobIdContainer.
+   */
+  private ThreadLocal<Long> jobId;
+
+  /**
+   * Each unique jdbc request(query, aggregation or others job) has an unique 
job id. This job id
+   * will always be maintained until the request is closed. In each job, the 
unique file will be
+   * only opened once to avoid too many opened files error.
+   */
+  private AtomicLong jobIdGenerator = new AtomicLong();
+
+  private QuerySession() {
+    this.jobId = new ThreadLocal<Long>(){
+      @Override
+      protected Long initialValue() {
+        super.initialValue();
+        long id = jobIdGenerator.incrementAndGet();
+        OpenedFilePathsManager.getInstance().addJobId(id);
+        QueryTokenManager.getInstance().addJobId(id);
+        return id;
+      }
+    };
+  }
+
+  public long getJobId() {
+    return jobId.get();
+  }
+
+  public static long getCurrentThreadJobId() {
+    return QuerySessionHelper.INSTANCE.jobId.get();
+  }
+
+  private static class QuerySessionHelper {
+
+    private static final QuerySession INSTANCE = new QuerySession();
+
+    private QuerySessionHelper() {
+    }
+  }
+
+  public static QuerySession getCurrentThreadQuerySession() {
+    return QuerySessionHelper.INSTANCE;
+  }
+
+
+
+
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java
index 50a2cb4..44ee763 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java
@@ -36,18 +36,13 @@ import 
org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
  * <p>
  * Singleton pattern, to manage all query tokens. Each jdbc query request can 
query multiple series,
  * in the processing of querying different device id, the 
<code>FileNodeManager.getInstance().
- * beginQuery</code> and <code>FileNodeManager.getInstance().endQuery</code> 
must be invoked in the
+ * beginQuery</code> and 
<code>FileNodeManager.getInstance().endQueryForGivenJob</code> must be invoked 
in the
  * beginning and ending of jdbc request.
  * </p>
  */
 public class QueryTokenManager {
 
   /**
-   * Each jdbc request has unique jod id, job id is stored in thread local 
variable jobContainer.
-   */
-  private ThreadLocal<Long> jobContainer;
-
-  /**
    * Map&lt;jobId, Map&lt;deviceId, List&lt;token&gt;&gt;&gt;.
    *
    * <p>
@@ -72,21 +67,21 @@ public class QueryTokenManager {
    * <code>FileNodeManager.getInstance().beginQuery(device_2)</code> will be 
invoked again, it
    * returns result token `3` and `4` .
    *
-   * <code>FileNodeManager.getInstance().endQuery(device_1, 1)</code> and
-   * <code>FileNodeManager.getInstance().endQuery(device_2, 2)</code> must be 
invoked no matter how
+   * <code>FileNodeManager.getInstance().endQueryForGivenJob(device_1, 
1)</code> and
+   * <code>FileNodeManager.getInstance().endQueryForGivenJob(device_2, 
2)</code> must be invoked no matter how
    * query process Q1 exits normally or abnormally. So is Q2,
-   * <code>FileNodeManager.getInstance().endQuery(device_1, 3)</code> and
-   * <code>FileNodeManager.getInstance().endQuery(device_2, 4)</code> must be 
invoked
+   * <code>FileNodeManager.getInstance().endQueryForGivenJob(device_1, 
3)</code> and
+   * <code>FileNodeManager.getInstance().endQueryForGivenJob(device_2, 
4)</code> must be invoked
    *
    * Last but no least, to ensure the correctness of write process and query 
process of IoTDB,
    * <code>FileNodeManager.getInstance().beginQuery()</code> and
-   * <code>FileNodeManager.getInstance().endQuery()</code> must be executed 
rightly.
+   * <code>FileNodeManager.getInstance().endQueryForGivenJob()</code> must be 
executed rightly.
    * </p>
    */
   private ConcurrentHashMap<Long, ConcurrentHashMap<String, List<Integer>>> 
queryTokensMap;
 
+
   private QueryTokenManager() {
-    jobContainer = new ThreadLocal<>();
     queryTokensMap = new ConcurrentHashMap<>();
   }
 
@@ -98,8 +93,7 @@ public class QueryTokenManager {
    * Set job id for current request thread. When a query request is created 
firstly, this method
    * must be invoked.
    */
-  public void setJobIdForCurrentRequestThread(long jobId) {
-    jobContainer.set(jobId);
+  public void addJobId(long jobId) {
     queryTokensMap.put(jobId, new ConcurrentHashMap<>());
   }
 
@@ -136,18 +130,15 @@ public class QueryTokenManager {
    * Whenever the jdbc request is closed normally or abnormally, this method 
must be invoked. All
    * query tokens created by this jdbc request must be cleared.
    */
-  public void endQueryForCurrentRequestThread() throws 
FileNodeManagerException {
-    if (jobContainer.get() != null) {
-      long jobId = jobContainer.get();
-      jobContainer.remove();
-
+  public void endQueryForGivenJob(long jobId) throws FileNodeManagerException {
       for (Map.Entry<String, List<Integer>> entry : 
queryTokensMap.get(jobId).entrySet()) {
         for (int token : entry.getValue()) {
           FileNodeManager.getInstance().endQuery(entry.getKey(), token);
         }
       }
       queryTokensMap.remove(jobId);
-    }
+    // remove usage of opened file paths of current thread
+    OpenedFilePathsManager.getInstance().removeUsedFilesForGivenJob(jobId);
   }
 
   private void getUniquePaths(IExpression expression, Set<String> deviceIdSet) 
{
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index 2e3ce45..2090074 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -23,12 +23,12 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.OpenedFilePathsManager;
+import org.apache.iotdb.db.query.control.QuerySession;
 import org.apache.iotdb.db.query.control.QueryTokenManager;
 import 
org.apache.iotdb.db.query.executor.groupby.GroupByWithOnlyTimeFilterDataSetDataSet;
 import 
org.apache.iotdb.db.query.executor.groupby.GroupByWithValueFilterDataSetDataSet;
@@ -52,12 +52,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
  */
 public class EngineQueryRouter {
 
-  /**
-   * Each unique jdbc request(query, aggregation or others job) has an unique 
job id. This job id
-   * will always be maintained until the request is closed. In each job, the 
unique file will be
-   * only opened once to avoid too many opened files error.
-   */
-  private AtomicLong jobIdGenerator = new AtomicLong();
+
 
   /**
    * execute physical plan.
@@ -65,9 +60,9 @@ public class EngineQueryRouter {
   public QueryDataSet query(QueryExpression queryExpression)
       throws FileNodeManagerException {
 
-    long nextJobId = getNextJobId();
-    QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
-    
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+    long nextJobId = QuerySession.getCurrentThreadJobId();
+    QueryTokenManager.getInstance().addJobId(nextJobId);
+    OpenedFilePathsManager.getInstance().addJobId(nextJobId);
 
     QueryContext context = new QueryContext();
 
@@ -107,9 +102,9 @@ public class EngineQueryRouter {
       IExpression expression) throws QueryFilterOptimizationException, 
FileNodeManagerException,
       IOException, PathErrorException, ProcessorException {
 
-    long nextJobId = getNextJobId();
-    QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
-    
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+    long nextJobId = QuerySession.getCurrentThreadJobId();
+    QueryTokenManager.getInstance().addJobId(nextJobId);
+    OpenedFilePathsManager.getInstance().addJobId(nextJobId);
 
     QueryContext context = new QueryContext();
 
@@ -146,9 +141,10 @@ public class EngineQueryRouter {
       throws ProcessorException, QueryFilterOptimizationException, 
FileNodeManagerException,
       PathErrorException, IOException {
 
-    long nextJobId = getNextJobId();
-    QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
-    
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+    long nextJobId = QuerySession.getCurrentThreadJobId();
+    QueryTokenManager.getInstance().addJobId(nextJobId);
+    OpenedFilePathsManager.getInstance().addJobId(nextJobId);
+
     QueryContext context = new QueryContext();
 
     // check the legitimacy of intervals
@@ -211,9 +207,9 @@ public class EngineQueryRouter {
    */
   public QueryDataSet fill(List<Path> fillPaths, long queryTime, 
Map<TSDataType, IFill> fillType)
       throws FileNodeManagerException, PathErrorException, IOException {
-    long nextJobId = getNextJobId();
-    QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
-    
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+    long nextJobId = QuerySession.getCurrentThreadJobId();
+    QueryTokenManager.getInstance().addJobId(nextJobId);
+    OpenedFilePathsManager.getInstance().addJobId(nextJobId);
 
     QueryContext context = new QueryContext();
     FillEngineExecutor fillEngineExecutor = new FillEngineExecutor(nextJobId, 
fillPaths, queryTime,
@@ -244,7 +240,5 @@ public class EngineQueryRouter {
     return merged;
   }
 
-  private synchronized long getNextJobId() {
-    return jobIdGenerator.incrementAndGet();
-  }
+
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 834dde3..c4877d8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -49,7 +49,7 @@ import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.query.control.OpenedFilePathsManager;
+import org.apache.iotdb.db.query.control.QuerySession;
 import org.apache.iotdb.db.query.control.QueryTokenManager;
 import org.apache.iotdb.service.rpc.thrift.ServerProperties;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
@@ -185,10 +185,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     LOGGER.info("{}: receive close operation", IoTDBConstant.GLOBAL_DB_NAME);
     try {
       // end query for all the query tokens created by current thread
-      QueryTokenManager.getInstance().endQueryForCurrentRequestThread();
-
-      // remove usage of opened file paths of current thread
-      
OpenedFilePathsManager.getInstance().removeUsedFilesForCurrentRequestThread();
+      
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
 
       clearAllStatusForCurrentRequest();
     } catch (FileNodeManagerException e) {
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
index 2a45c29..82f7c23 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
@@ -197,7 +197,7 @@ public class IoTDBEngineTimeGeneratorIT {
 
     SingleSeriesExpression singleSeriesExpression = new 
SingleSeriesExpression(pd0s0,
         FilterFactory.and(valueGtEq, timeGt));
-    OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0);
+    OpenedFilePathsManager.getInstance().addJobId(0);
     QueryContext context = new QueryContext();
     EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, 
singleSeriesExpression,
         context);
@@ -222,7 +222,7 @@ public class IoTDBEngineTimeGeneratorIT {
     Path pd1s0 = new Path(Constant.d1s0);
     ValueFilter.ValueGtEq valueGtEq = ValueFilter.gtEq(5);
 
-    OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0);
+    OpenedFilePathsManager.getInstance().addJobId(0);
     IExpression singleSeriesExpression = new SingleSeriesExpression(pd1s0, 
valueGtEq);
     QueryContext context = new QueryContext();
     EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, 
singleSeriesExpression,
@@ -258,7 +258,7 @@ public class IoTDBEngineTimeGeneratorIT {
     IExpression andExpression = BinaryExpression
         .and(singleSeriesExpression1, singleSeriesExpression2);
 
-    OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0);
+    OpenedFilePathsManager.getInstance().addJobId(0);
     QueryContext context = new QueryContext();
     EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, 
andExpression, context);
     int cnt = 0;
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
index b910437..d7c091e 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
@@ -27,6 +27,7 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.control.QuerySession;
 import org.apache.iotdb.db.query.control.QueryTokenManager;
 import org.apache.iotdb.db.query.executor.EngineQueryRouter;
 import org.apache.iotdb.db.service.IoTDB;
@@ -188,7 +189,7 @@ public class IoTDBSequenceDataQueryIT {
     }
     assertEquals(1000, cnt);
 
-    QueryTokenManager.getInstance().endQueryForCurrentRequestThread();
+    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
   }
 
   @Test
@@ -214,7 +215,7 @@ public class IoTDBSequenceDataQueryIT {
     }
     assertEquals(350, cnt);
 
-    QueryTokenManager.getInstance().endQueryForCurrentRequestThread();
+    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
   }
 
   @Test
@@ -245,7 +246,7 @@ public class IoTDBSequenceDataQueryIT {
     }
     assertEquals(count, cnt);
 
-    QueryTokenManager.getInstance().endQueryForCurrentRequestThread();
+    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
   }
 
 }
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
index 387310b..1c991ae 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
@@ -29,6 +29,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.control.QuerySession;
 import org.apache.iotdb.db.query.control.QueryTokenManager;
 import org.apache.iotdb.db.query.executor.EngineQueryRouter;
 import org.apache.iotdb.db.service.IoTDB;
@@ -262,7 +263,7 @@ public class IoTDBSeriesReaderIT {
     }
     assertEquals(23400, cnt);
 
-    QueryTokenManager.getInstance().endQueryForCurrentRequestThread();
+    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
   }
 
   @Test
@@ -290,7 +291,7 @@ public class IoTDBSeriesReaderIT {
     }
     assertEquals(16440, cnt);
 
-    QueryTokenManager.getInstance().endQueryForCurrentRequestThread();
+    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
   }
 
   @Test
@@ -316,7 +317,7 @@ public class IoTDBSeriesReaderIT {
     }
     assertEquals(3012, cnt);
 
-    QueryTokenManager.getInstance().endQueryForCurrentRequestThread();
+    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
   }
 
   @Test
@@ -343,7 +344,7 @@ public class IoTDBSeriesReaderIT {
     }
     assertEquals(22800, cnt);
 
-    QueryTokenManager.getInstance().endQueryForCurrentRequestThread();
+    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
   }
 
   @Test
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
index fe17a1a..81d6f30 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
@@ -62,7 +62,7 @@ public class FileReaderManagerTest {
 
     Thread t1 = new Thread(() -> {
       try {
-        
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(1L);
+        OpenedFilePathsManager.getInstance().addJobId(1L);
 
         for (int i = 1; i <= 6; i++) {
           OpenedFilePathsManager.getInstance().addFilePathToMap(1L, filePath + 
i,
@@ -80,7 +80,7 @@ public class FileReaderManagerTest {
 
     Thread t2 = new Thread(() -> {
       try {
-        
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(2L);
+        OpenedFilePathsManager.getInstance().addJobId(2L);
 
         for (int i = 4; i <= MAX_FILE_SIZE; i++) {
           OpenedFilePathsManager.getInstance().addFilePathToMap(2L, filePath + 
i,
@@ -120,7 +120,7 @@ public class FileReaderManagerTest {
     // }
     // }
 
-    
OpenedFilePathsManager.getInstance().removeUsedFilesForCurrentRequestThread();
+    
OpenedFilePathsManager.getInstance().removeUsedFilesForGivenJob(QuerySession.getCurrentThreadJobId());
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
     for (int i = 1; i < MAX_FILE_SIZE; i++) {
       File file = new File(filePath + i);
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java 
b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index e45fe1f..ad5a007 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.control.QuerySession;
 import org.apache.iotdb.db.query.control.QueryTokenManager;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -60,7 +61,7 @@ public class EnvironmentUtils {
 
   public static void cleanEnv() throws IOException, FileNodeManagerException {
 
-    QueryTokenManager.getInstance().endQueryForCurrentRequestThread();
+    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
 
     // clear opened file streams
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();

Reply via email to