This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/new-rc1.0.1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 771f2b69f803155d6fa9b4f76622858b6cfc179b Author: Haonan <[email protected]> AuthorDate: Wed Feb 8 19:44:41 2023 +0800 [IOTDB-5498] Fix SessionPool OOM when the numbers of devices and sessions are large (#9017) --- .../java/org/apache/iotdb/isession/ISession.java | 7 +++++ .../java/org/apache/iotdb/session/Session.java | 31 ++++++++++++++++++++-- .../org/apache/iotdb/session/pool/SessionPool.java | 16 +++++++++-- 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java b/isession/src/main/java/org/apache/iotdb/isession/ISession.java index 922e78d0bb..4ab78f128f 100644 --- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java +++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.isession; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.isession.template.Template; import org.apache.iotdb.isession.util.SystemStatus; import org.apache.iotdb.isession.util.Version; @@ -54,6 +55,12 @@ public interface ISession extends AutoCloseable { void open(boolean enableRPCCompression, int connectionTimeoutInMs) throws IoTDBConnectionException; + void open( + boolean enableRPCCompression, + int connectionTimeoutInMs, + Map<String, TEndPoint> deviceIdToEndpoint) + throws IoTDBConnectionException; + void close() throws IoTDBConnectionException; String getTimeZone(); diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index 9638238c3d..0298785756 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -404,6 +404,28 @@ public class Session implements ISession { } } + @Override + public synchronized void open( + boolean enableRPCCompression, + int connectionTimeoutInMs, + Map<String, TEndPoint> deviceIdToEndpoint) + throws IoTDBConnectionException { + if (!isClosed) { + return; + } + + this.enableRPCCompression = enableRPCCompression; + this.connectionTimeoutInMs = connectionTimeoutInMs; + defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId); + defaultSessionConnection.setEnableRedirect(enableQueryRedirection); + isClosed = false; + if (enableRedirection || enableQueryRedirection) { + this.deviceIdToEndpoint = deviceIdToEndpoint; + endPointToSessionConnection = new ConcurrentHashMap<>(); + endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection); + } + } + @Override public synchronized void close() throws IoTDBConnectionException { if (isClosed) { @@ -920,7 +942,8 @@ public class Session implements ISession { TEndPoint endPoint; if (enableRedirection && !deviceIdToEndpoint.isEmpty() - && (endPoint = deviceIdToEndpoint.get(deviceId)) != null) { + && (endPoint = deviceIdToEndpoint.get(deviceId)) != null + && endPointToSessionConnection.containsKey(endPoint)) { return endPointToSessionConnection.get(endPoint); } else { return defaultSessionConnection; @@ -965,7 +988,10 @@ public class Session implements ISession { return; } AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>(); - deviceIdToEndpoint.put(deviceId, endpoint); + if (!deviceIdToEndpoint.containsKey(deviceId) + || !deviceIdToEndpoint.get(deviceId).equals(endpoint)) { + deviceIdToEndpoint.put(deviceId, endpoint); + } SessionConnection connection = endPointToSessionConnection.computeIfAbsent( endpoint, @@ -3259,6 +3285,7 @@ public class Session implements ISession { completableFuture.join(); } catch (CompletionException completionException) { Throwable cause = completionException.getCause(); + logger.error("Meet error when async insert!", cause); if (cause instanceof IoTDBConnectionException) { throw (IoTDBConnectionException) cause; } else { diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java index 12661ec21d..ed89b085bf 100644 --- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java +++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.session.pool; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.isession.ISession; import org.apache.iotdb.isession.ISessionDataSet; import org.apache.iotdb.isession.SessionConfig; @@ -93,6 +94,8 @@ public class SessionPool implements ISessionPool { private boolean enableRedirection; private boolean enableQueryRedirection = false; + private Map<String, TEndPoint> deviceIdToEndpoint; + private int thriftDefaultBufferSize; private int thriftMaxFrameSize; @@ -299,6 +302,9 @@ public class SessionPool implements ISessionPool { this.enableCompression = enableCompression; this.zoneId = zoneId; this.enableRedirection = enableRedirection; + if (this.enableRedirection) { + deviceIdToEndpoint = new ConcurrentHashMap<>(); + } this.connectionTimeoutInMs = connectionTimeoutInMs; this.version = version; this.thriftDefaultBufferSize = thriftDefaultBufferSize; @@ -330,6 +336,9 @@ public class SessionPool implements ISessionPool { this.enableCompression = enableCompression; this.zoneId = zoneId; this.enableRedirection = enableRedirection; + if (this.enableRedirection) { + deviceIdToEndpoint = new ConcurrentHashMap<>(); + } this.connectionTimeoutInMs = connectionTimeoutInMs; this.version = version; this.thriftDefaultBufferSize = thriftDefaultBufferSize; @@ -448,7 +457,7 @@ public class SessionPool implements ISessionPool { session = constructNewSession(); try { - session.open(enableCompression, connectionTimeoutInMs); + session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint); // avoid someone has called close() the session pool synchronized (this) { if (closed) { @@ -548,7 +557,7 @@ public class SessionPool implements ISessionPool { private void tryConstructNewSession() { Session session = constructNewSession(); try { - session.open(enableCompression, connectionTimeoutInMs); + session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint); // avoid someone has called close() the session pool synchronized (this) { if (closed) { @@ -2639,6 +2648,9 @@ public class SessionPool implements ISessionPool { @Override public void setEnableRedirection(boolean enableRedirection) { this.enableRedirection = enableRedirection; + if (this.enableRedirection) { + deviceIdToEndpoint = new ConcurrentHashMap<>(); + } for (ISession session : queue) { session.setEnableRedirection(enableRedirection); }
