This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch session-pool-optimize in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 049c0984683e600464d7e0d5f4cb2276bdc0688c Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Aug 19 19:43:02 2021 +0800 optimize session creation --- .../org/apache/iotdb/session/pool/SessionPool.java | 140 ++++++++++----------- 1 file changed, 70 insertions(+), 70 deletions(-) diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java index 79a539d..ed04509 100644 --- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java +++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java @@ -174,7 +174,6 @@ public class SessionPool { // if this method throws an exception, either the server is broken, or the ip/port/user/password // is incorrect. - @SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive Complexity warning private Session getSession() throws IoTDBConnectionException { Session session = queue.poll(); @@ -183,91 +182,92 @@ public class SessionPool { } if (session != null) { return session; - } else { - long start = System.currentTimeMillis(); - boolean canCreate = false; + } + + boolean shouldCreate = false; + + long start = System.currentTimeMillis(); + while (session == null) { synchronized (this) { if (size < maxSize) { // we can create more session size++; - canCreate = true; + shouldCreate = true; // but we do it after skip synchronized block because connection a session is time // consuming. + break; } - } - if (canCreate) { - // create a new one. - if (logger.isDebugEnabled()) { - logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password); - } - session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader); + + // we have to wait for someone returns a session. try { - session.open(enableCompression); - // avoid someone has called close() the session pool - synchronized (this) { - if (closed) { - // have to release the connection... - session.close(); - throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED); - } else { - return session; - } + if (logger.isDebugEnabled()) { + logger.debug("no more sessions can be created, wait... queue.size={}", queue.size()); } - } catch (IoTDBConnectionException e) { - // if exception, we will throw the exception. - // Meanwhile, we have to set size-- - synchronized (this) { - size--; - // we do not need to notifyAll as any waited thread can continue to work after waked up. - this.notify(); - if (logger.isDebugEnabled()) { - logger.debug("open session failed, reduce the count and notify others..."); + this.wait(1000); + long time = timeout < 60_000 ? timeout : 60_000; + if (System.currentTimeMillis() - start > time) { + logger.warn( + "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}", + (System.currentTimeMillis() - start) / 1000, + ip, + port, + user, + password); + logger.warn( + "current occupied size {}, queue size {}, considered size {} ", + occupied.size(), + queue.size(), + size); + if (System.currentTimeMillis() - start > timeout) { + throw new IoTDBConnectionException( + String.format("timeout to get a connection from %s:%s", ip, port)); } } - throw e; + } catch (InterruptedException e) { + logger.error("the SessionPool is damaged", e); + Thread.currentThread().interrupt(); } - } else { - while (session == null) { - synchronized (this) { - if (closed) { - throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED); - } - // we have to wait for someone returns a session. - try { - if (logger.isDebugEnabled()) { - logger.debug( - "no more sessions can be created, wait... queue.size={}", queue.size()); - } - this.wait(1000); - long time = timeout < 60_000 ? timeout : 60_000; - if (System.currentTimeMillis() - start > time) { - logger.warn( - "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}", - (System.currentTimeMillis() - start) / 1000, - ip, - port, - user, - password); - logger.warn( - "current occupied size {}, queue size {}, considered size {} ", - occupied.size(), - queue.size(), - size); - if (System.currentTimeMillis() - start > timeout) { - throw new IoTDBConnectionException( - String.format("timeout to get a connection from %s:%s", ip, port)); - } - } - } catch (InterruptedException e) { - logger.error("the SessionPool is damaged", e); - Thread.currentThread().interrupt(); - } - session = queue.poll(); + + session = queue.poll(); + + if (closed) { + throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED); + } + } + } + + if (shouldCreate) { + // create a new one. + if (logger.isDebugEnabled()) { + logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password); + } + session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader); + try { + session.open(enableCompression); + // avoid someone has called close() the session pool + synchronized (this) { + if (closed) { + // have to release the connection... + session.close(); + throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED); } } - return session; + } catch (IoTDBConnectionException e) { + // if exception, we will throw the exception. + // Meanwhile, we have to set size-- + synchronized (this) { + size--; + // we do not need to notifyAll as any waited thread can continue to work after waked up. + this.notify(); + if (logger.isDebugEnabled()) { + logger.debug("open session failed, reduce the count and notify others..."); + } + } + throw e; } } + + return session; } public int currentAvailableSize() {
