This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 0e4adc5  Fix cluster query memory leak (#4335)
0e4adc5 is described below

commit 0e4adc58cefa325a77b1df232deabb881fcc0629
Author: Xiangwei Wei <[email protected]>
AuthorDate: Mon Nov 8 18:09:36 2021 +0800

    Fix cluster query memory leak (#4335)
---
 .../apache/iotdb/cluster/server/ClientServer.java  |  2 +-
 .../iotdb/db/query/control/SessionManager.java     | 23 +++++++--------------
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 24 +++++++++++++++-------
 3 files changed, 25 insertions(+), 24 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index 36de122..ec2c523 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -302,7 +302,7 @@ public class ClientServer extends TSServiceImpl {
    * @throws StorageEngineException
    */
   @Override
-  protected void releaseQueryResource(long queryId) throws 
StorageEngineException {
+  public void releaseQueryResource(long queryId) throws StorageEngineException 
{
     // release resources locally
     super.releaseQueryResource(queryId);
     // release resources remotely
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java 
b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index c3257fb..9fc5ed5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
 
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.query.dataset.UDTFDataSet;
+import org.apache.iotdb.db.service.TSServiceImpl;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import org.slf4j.Logger;
@@ -85,7 +86,7 @@ public class SessionManager {
     return sessionId;
   }
 
-  public boolean releaseSessionResource(long sessionId) {
+  public boolean releaseSessionResource(TSServiceImpl service, long sessionId) 
{
     sessionIdToZoneId.remove(sessionId);
 
     Set<Long> statementIdSet = sessionIdToStatementId.remove(sessionId);
@@ -94,7 +95,7 @@ public class SessionManager {
         Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
         if (queryIdSet != null) {
           for (Long queryId : queryIdSet) {
-            releaseQueryResourceNoExceptions(queryId);
+            service.releaseQueryResourceNoExceptions(queryId);
           }
         }
       }
@@ -111,11 +112,11 @@ public class SessionManager {
     return statementId;
   }
 
-  public void closeStatement(long sessionId, long statementId) {
+  public void closeStatement(TSServiceImpl tsService, long sessionId, long 
statementId) {
     Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
     if (queryIdSet != null) {
       for (Long queryId : queryIdSet) {
-        releaseQueryResourceNoExceptions(queryId);
+        tsService.releaseQueryResourceNoExceptions(queryId);
       }
     }
 
@@ -144,16 +145,6 @@ public class SessionManager {
     QueryResourceManager.getInstance().endQuery(queryId);
   }
 
-  public void releaseQueryResourceNoExceptions(long queryId) {
-    if (queryId != -1) {
-      try {
-        releaseQueryResource(queryId);
-      } catch (Exception e) {
-        LOGGER.warn("Error occurred while releasing query resource: ", e);
-      }
-    }
-  }
-
   public String getUsername(Long sessionId) {
     return sessionIdToUsername.get(sessionId);
   }
@@ -182,8 +173,8 @@ public class SessionManager {
     queryIdToDataSet.remove(queryId);
   }
 
-  public void closeDataset(Long statementId, Long queryId) {
-    releaseQueryResourceNoExceptions(queryId);
+  public void closeDataset(TSServiceImpl tsService, Long statementId, Long 
queryId) {
+    tsService.releaseQueryResourceNoExceptions(queryId);
     if (statementIdToQueryId.containsKey(statementId)) {
       statementIdToQueryId.get(statementId).remove(queryId);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index ad4238d..37127c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -278,7 +278,7 @@ public class TSServiceImpl implements TSIService.Iface {
     sessionManager.removeCurrSessionId();
 
     return new TSStatus(
-        !sessionManager.releaseSessionResource(sessionId)
+        !sessionManager.releaseSessionResource(this, sessionId)
             ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
             : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
   }
@@ -305,9 +305,9 @@ public class TSServiceImpl implements TSIService.Iface {
     try {
       if (req.isSetStatementId()) {
         if (req.isSetQueryId()) {
-          sessionManager.closeDataset(req.statementId, req.queryId);
+          sessionManager.closeDataset(this, req.statementId, req.queryId);
         } else {
-          sessionManager.closeStatement(req.sessionId, req.statementId);
+          sessionManager.closeStatement(this, req.sessionId, req.statementId);
         }
         return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
       } else {
@@ -321,10 +321,20 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   /** release single operation resource */
-  protected void releaseQueryResource(long queryId) throws 
StorageEngineException {
+  public void releaseQueryResource(long queryId) throws StorageEngineException 
{
     sessionManager.releaseQueryResource(queryId);
   }
 
+  public void releaseQueryResourceNoExceptions(long queryId) {
+    if (queryId != -1) {
+      try {
+        releaseQueryResource(queryId);
+      } catch (Exception e) {
+        LOGGER.warn("Error occurred while releasing query resource: ", e);
+      }
+    }
+  }
+
   @Override
   public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) {
     TSFetchMetadataResp resp = new TSFetchMetadataResp();
@@ -802,7 +812,7 @@ public class TSServiceImpl implements TSIService.Iface {
       }
       return resp;
     } catch (Exception e) {
-      sessionManager.releaseQueryResourceNoExceptions(queryId);
+      releaseQueryResourceNoExceptions(queryId);
       throw e;
     } finally {
       Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, 
startTime);
@@ -1006,7 +1016,7 @@ public class TSServiceImpl implements TSIService.Iface {
                 req.fetchSize, queryDataSet, 
sessionManager.getUsername(req.sessionId));
         boolean hasResultSet = result.bufferForTime().limit() != 0;
         if (!hasResultSet) {
-          sessionManager.releaseQueryResourceNoExceptions(req.queryId);
+          releaseQueryResourceNoExceptions(req.queryId);
         }
         TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
         resp.setHasResultSet(hasResultSet);
@@ -1044,7 +1054,7 @@ public class TSServiceImpl implements TSIService.Iface {
           onNPEOrUnexpectedException(
               e, "executing fetchResults", 
TSStatusCode.INTERNAL_SERVER_ERROR));
     } catch (Exception e) {
-      sessionManager.releaseQueryResourceNoExceptions(req.queryId);
+      releaseQueryResourceNoExceptions(req.queryId);
       return RpcUtils.getTSFetchResultsResp(
           onNPEOrUnexpectedException(
               e, "executing fetchResults", 
TSStatusCode.INTERNAL_SERVER_ERROR));

Reply via email to