This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch memoryleakmaster in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 01b6db981944e9bfeee68b268d2be93121d16f6d Author: Alima777 <[email protected]> AuthorDate: Mon Nov 8 21:44:39 2021 +0800 Add cluster session manager to release query resource --- .../query/manage/ClusterSessionManager.java | 127 +++++++++++++++++++++ .../apache/iotdb/cluster/server/ClientServer.java | 71 +----------- .../iotdb/cluster/server/MetaClusterServer.java | 16 ++- .../iotdb/db/query/control/SessionManager.java | 2 +- .../db/query/control/SessionTimeoutManager.java | 5 +- .../org/apache/iotdb/db/service/TSServiceImpl.java | 11 +- 6 files changed, 155 insertions(+), 77 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java new file mode 100644 index 0000000..6cd8be3 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterSessionManager.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.query.manage; + +import org.apache.iotdb.cluster.client.async.AsyncDataClient; +import org.apache.iotdb.cluster.client.sync.SyncDataClient; +import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.coordinator.Coordinator; +import org.apache.iotdb.cluster.query.RemoteQueryContext; +import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.rpc.thrift.RaftNode; +import org.apache.iotdb.cluster.server.RaftServer; +import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.query.control.SessionManager; + +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +public class ClusterSessionManager extends SessionManager { + + protected ClusterSessionManager() { + // singleton + } + + private static final Logger logger = LoggerFactory.getLogger(ClusterSessionManager.class); + + /** + * The Coordinator of the local node. Through this node ClientServer queries data and meta from + * the cluster and performs data manipulations to the cluster. + */ + private Coordinator coordinator; + + public void setCoordinator(Coordinator coordinator) { + this.coordinator = coordinator; + } + + /** + * queryId -> queryContext map. When a query ends either normally or accidentally, the resources + * used by the query can be found in the context and then released. + */ + private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>(); + + public void putContext(long queryId, RemoteQueryContext context) { + queryContextMap.put(queryId, context); + } + + public void releaseQueryResource(long queryId) throws StorageEngineException { + super.releaseQueryResource(queryId); + this.releaseRemoteQueryResource(queryId); + } + + /** Release remote resources used by a query. */ + public void releaseRemoteQueryResource(long queryId) { + // release resources remotely + RemoteQueryContext context = queryContextMap.remove(queryId); + if (context != null) { + // release the resources in every queried node + for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) { + RaftNode header = headerEntry.getKey(); + Set<Node> queriedNodes = headerEntry.getValue(); + + for (Node queriedNode : queriedNodes) { + GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>()); + try { + if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { + AsyncDataClient client = + coordinator.getAsyncDataClient( + queriedNode, RaftServer.getReadOperationTimeoutMS()); + client.endQuery(header, coordinator.getThisNode(), queryId, handler); + } else { + try (SyncDataClient syncDataClient = + coordinator.getSyncDataClient( + queriedNode, RaftServer.getReadOperationTimeoutMS())) { + try { + syncDataClient.endQuery(header, coordinator.getThisNode(), queryId); + } catch (TException e) { + // the connection may be broken, close it to avoid it being reused + syncDataClient.getInputProtocol().getTransport().close(); + throw e; + } + } + } + } catch (IOException | TException e) { + logger.error("Cannot end query {} in {}", queryId, queriedNode); + } + } + } + } + } + + public static ClusterSessionManager getInstance() { + return ClusterSessionManagerHolder.INSTANCE; + } + + private static class ClusterSessionManagerHolder { + + private ClusterSessionManagerHolder() {} + + private static final ClusterSessionManager INSTANCE = new ClusterSessionManager(); + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java index 86e5fd2..8876329 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java @@ -19,20 +19,15 @@ package org.apache.iotdb.cluster.server; -import org.apache.iotdb.cluster.client.async.AsyncDataClient; -import org.apache.iotdb.cluster.client.sync.SyncDataClient; import org.apache.iotdb.cluster.config.ClusterConfig; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.coordinator.Coordinator; import org.apache.iotdb.cluster.query.ClusterPlanExecutor; import org.apache.iotdb.cluster.query.ClusterPlanner; import org.apache.iotdb.cluster.query.RemoteQueryContext; -import org.apache.iotdb.cluster.rpc.thrift.Node; -import org.apache.iotdb.cluster.rpc.thrift.RaftNode; -import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; +import org.apache.iotdb.cluster.query.manage.ClusterSessionManager; import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.sys.FlushPlan; @@ -46,7 +41,6 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSIService.Processor; import org.apache.iotdb.service.rpc.thrift.TSStatus; -import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; @@ -62,19 +56,13 @@ import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; /** * ClientServer is the cluster version of TSServiceImpl, which is responsible for the processing of @@ -106,16 +94,12 @@ public class ClientServer extends TSServiceImpl { /** The socket poolServer will listen to. Async service requires nonblocking socket */ private TServerTransport serverTransport; - /** - * queryId -> queryContext map. When a query ends either normally or accidentally, the resources - * used by the query can be found in the context and then released. - */ - private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>(); - public ClientServer(MetaGroupMember metaGroupMember) throws QueryProcessException { super(); this.processor = new ClusterPlanner(); this.executor = new ClusterPlanExecutor(metaGroupMember); + TSServiceImpl.setSessionManager(ClusterSessionManager.getInstance()); + ((ClusterSessionManager) sessionManager).setCoordinator(coordinator); } /** @@ -258,54 +242,7 @@ public class ClientServer extends TSServiceImpl { long queryId, boolean debug, long startTime, String statement, long timeout) { RemoteQueryContext context = new RemoteQueryContext(queryId, debug, startTime, statement, timeout); - queryContextMap.put(queryId, context); + ClusterSessionManager.getInstance().putContext(queryId, context); return context; } - - /** - * Release the local and remote resources used by a query. - * - * @param queryId - * @throws StorageEngineException - */ - @Override - protected void releaseQueryResource(long queryId) throws StorageEngineException { - // release resources locally - super.releaseQueryResource(queryId); - // release resources remotely - RemoteQueryContext context = queryContextMap.remove(queryId); - if (context != null) { - // release the resources in every queried node - for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) { - RaftNode header = headerEntry.getKey(); - Set<Node> queriedNodes = headerEntry.getValue(); - - for (Node queriedNode : queriedNodes) { - GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>()); - try { - if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { - AsyncDataClient client = - coordinator.getAsyncDataClient( - queriedNode, RaftServer.getReadOperationTimeoutMS()); - client.endQuery(header, coordinator.getThisNode(), queryId, handler); - } else { - try (SyncDataClient syncDataClient = - coordinator.getSyncDataClient( - queriedNode, RaftServer.getReadOperationTimeoutMS())) { - try { - syncDataClient.endQuery(header, coordinator.getThisNode(), queryId); - } catch (TException e) { - // the connection may be broken, close it to avoid it being reused - syncDataClient.getInputProtocol().getTransport().close(); - throw e; - } - } - } - } catch (IOException | TException e) { - logger.error("Cannot end query {} in {}", queryId, queriedNode); - } - } - } - } - } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java index ec19cad..e295e13 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java @@ -24,7 +24,21 @@ import org.apache.iotdb.cluster.exception.ConfigInconsistentException; import org.apache.iotdb.cluster.exception.StartUpCheckFailureException; import org.apache.iotdb.cluster.metadata.CMManager; import org.apache.iotdb.cluster.metadata.MetaPuller; -import org.apache.iotdb.cluster.rpc.thrift.*; +import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse; +import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest; +import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; +import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse; +import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest; +import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq; +import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest; +import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse; +import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.rpc.thrift.RaftNode; +import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse; +import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest; +import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus; +import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus; +import org.apache.iotdb.cluster.rpc.thrift.TSMetaService; import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor; import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor; import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatServer; diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java index c479953..b22f21f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java @@ -54,7 +54,7 @@ public class SessionManager { // (queryId -> QueryDataSet) private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>(); - private SessionManager() { + protected SessionManager() { // singleton } diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java index 6741c05..92e55df 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control; import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.service.TSServiceImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,10 +69,10 @@ public class SessionTimeoutManager { public boolean unregister(long sessionId) { if (SESSION_TIMEOUT == 0) { - return SessionManager.getInstance().releaseSessionResource(sessionId); + return TSServiceImpl.sessionManager.releaseSessionResource(sessionId); } - if (SessionManager.getInstance().releaseSessionResource(sessionId)) { + if (TSServiceImpl.sessionManager.releaseSessionResource(sessionId)) { return sessionIdToLastActiveTime.remove(sessionId) != null; } diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 0f1158d..591ce66 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -196,7 +196,7 @@ public class TSServiceImpl implements TSIService.Iface { private static final List<SqlArgument> sqlArgumentList = new ArrayList<>(MAX_SIZE); private static final AtomicInteger queryCount = new AtomicInteger(0); private final QueryTimeManager queryTimeManager = QueryTimeManager.getInstance(); - private final SessionManager sessionManager = SessionManager.getInstance(); + public static SessionManager sessionManager = SessionManager.getInstance(); private final TracingManager tracingManager = TracingManager.getInstance(); private long startTime = -1L; @@ -300,6 +300,10 @@ public class TSServiceImpl implements TSIService.Iface { : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); } + public static void setSessionManager(SessionManager sessionManager) { + TSServiceImpl.sessionManager = sessionManager; + } + @Override public TSStatus cancelOperation(TSCancelOperationReq req) { // TODO implement @@ -337,11 +341,6 @@ public class TSServiceImpl implements TSIService.Iface { } } - /** release single operation resource */ - protected void releaseQueryResource(long queryId) throws StorageEngineException { - sessionManager.releaseQueryResource(queryId); - } - @Override public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) { TSFetchMetadataResp resp = new TSFetchMetadataResp();
