This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch memoryleak in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4af55d463af5a3b71a00ccd8f54f0e0ca427b2e9 Author: Alima777 <[email protected]> AuthorDate: Mon Nov 8 16:44:35 2021 +0800 Fix memory leak --- .../apache/iotdb/cluster/server/ClientServer.java | 2 +- .../iotdb/db/query/control/SessionManager.java | 23 +++++++-------------- .../org/apache/iotdb/db/service/TSServiceImpl.java | 24 +++++++++++++++------- 3 files changed, 25 insertions(+), 24 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java index 36de122..ec2c523 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java @@ -302,7 +302,7 @@ public class ClientServer extends TSServiceImpl { * @throws StorageEngineException */ @Override - protected void releaseQueryResource(long queryId) throws StorageEngineException { + public void releaseQueryResource(long queryId) throws StorageEngineException { // release resources locally super.releaseQueryResource(queryId); // release resources remotely diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java index c3257fb..9fc5ed5 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.query.dataset.UDTFDataSet; +import org.apache.iotdb.db.service.TSServiceImpl; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.slf4j.Logger; @@ -85,7 +86,7 @@ public class SessionManager { return sessionId; } - public boolean releaseSessionResource(long sessionId) { + public boolean releaseSessionResource(TSServiceImpl service, long sessionId) { sessionIdToZoneId.remove(sessionId); Set<Long> statementIdSet = sessionIdToStatementId.remove(sessionId); @@ -94,7 +95,7 @@ public class SessionManager { Set<Long> queryIdSet = statementIdToQueryId.remove(statementId); if (queryIdSet != null) { for (Long queryId : queryIdSet) { - releaseQueryResourceNoExceptions(queryId); + service.releaseQueryResourceNoExceptions(queryId); } } } @@ -111,11 +112,11 @@ public class SessionManager { return statementId; } - public void closeStatement(long sessionId, long statementId) { + public void closeStatement(TSServiceImpl tsService, long sessionId, long statementId) { Set<Long> queryIdSet = statementIdToQueryId.remove(statementId); if (queryIdSet != null) { for (Long queryId : queryIdSet) { - releaseQueryResourceNoExceptions(queryId); + tsService.releaseQueryResourceNoExceptions(queryId); } } @@ -144,16 +145,6 @@ public class SessionManager { QueryResourceManager.getInstance().endQuery(queryId); } - public void releaseQueryResourceNoExceptions(long queryId) { - if (queryId != -1) { - try { - releaseQueryResource(queryId); - } catch (Exception e) { - LOGGER.warn("Error occurred while releasing query resource: ", e); - } - } - } - public String getUsername(Long sessionId) { return sessionIdToUsername.get(sessionId); } @@ -182,8 +173,8 @@ public class SessionManager { queryIdToDataSet.remove(queryId); } - public void closeDataset(Long statementId, Long queryId) { - releaseQueryResourceNoExceptions(queryId); + public void closeDataset(TSServiceImpl tsService, Long statementId, Long queryId) { + tsService.releaseQueryResourceNoExceptions(queryId); if (statementIdToQueryId.containsKey(statementId)) { statementIdToQueryId.get(statementId).remove(queryId); } diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index b36a5ee..813e93c 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -281,7 +281,7 @@ public class TSServiceImpl implements TSIService.Iface { sessionManager.removeCurrSessionId(); return new TSStatus( - !sessionManager.releaseSessionResource(sessionId) + !sessionManager.releaseSessionResource(this, sessionId) ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR) : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); } @@ -308,9 +308,9 @@ public class TSServiceImpl implements TSIService.Iface { try { if (req.isSetStatementId()) { if (req.isSetQueryId()) { - sessionManager.closeDataset(req.statementId, req.queryId); + sessionManager.closeDataset(this, req.statementId, req.queryId); } else { - sessionManager.closeStatement(req.sessionId, req.statementId); + sessionManager.closeStatement(this, req.sessionId, req.statementId); } return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } else { @@ -324,10 +324,20 @@ public class TSServiceImpl implements TSIService.Iface { } /** release single operation resource */ - protected void releaseQueryResource(long queryId) throws StorageEngineException { + public void releaseQueryResource(long queryId) throws StorageEngineException { sessionManager.releaseQueryResource(queryId); } + public void releaseQueryResourceNoExceptions(long queryId) { + if (queryId != -1) { + try { + releaseQueryResource(queryId); + } catch (Exception e) { + LOGGER.warn("Error occurred while releasing query resource: ", e); + } + } + } + @Override public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) { TSFetchMetadataResp resp = new TSFetchMetadataResp(); @@ -805,7 +815,7 @@ public class TSServiceImpl implements TSIService.Iface { } return resp; } catch (Exception e) { - sessionManager.releaseQueryResourceNoExceptions(queryId); + releaseQueryResourceNoExceptions(queryId); throw e; } finally { Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime); @@ -1009,7 +1019,7 @@ public class TSServiceImpl implements TSIService.Iface { req.fetchSize, queryDataSet, sessionManager.getUsername(req.sessionId)); boolean hasResultSet = result.bufferForTime().limit() != 0; if (!hasResultSet) { - sessionManager.releaseQueryResourceNoExceptions(req.queryId); + releaseQueryResourceNoExceptions(req.queryId); } TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); resp.setHasResultSet(hasResultSet); @@ -1047,7 +1057,7 @@ public class TSServiceImpl implements TSIService.Iface { onNPEOrUnexpectedException( e, "executing fetchResults", TSStatusCode.INTERNAL_SERVER_ERROR)); } catch (Exception e) { - sessionManager.releaseQueryResourceNoExceptions(req.queryId); + releaseQueryResourceNoExceptions(req.queryId); return RpcUtils.getTSFetchResultsResp( onNPEOrUnexpectedException( e, "executing fetchResults", TSStatusCode.INTERNAL_SERVER_ERROR));
