This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-6327 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b245c2a03c549f5a23b8f5dbb179747a090a0cdf Author: JackieTien97 <[email protected]> AuthorDate: Sat May 11 16:45:23 2024 +0800 [IOTDB-6327] Random choosing available nodes to send sql requests --- .../java/org/apache/iotdb/isession/INodeSupplier.java | 3 +++ .../org/apache/iotdb/session/DummyNodesSupplier.java | 12 ++++++++++++ .../java/org/apache/iotdb/session/NodesSupplier.java | 12 ++++++++++++ .../org/apache/iotdb/session/QueryEndPointPolicy.java} | 7 +++---- .../org/apache/iotdb/session/RoundRobinPolicy.java} | 17 +++++++++++++---- .../src/main/java/org/apache/iotdb/session/Session.java | 17 ++++++++++++++++- .../apache/iotdb/session/SessionCacheLeaderTest.java | 0 .../org/apache/iotdb/session/SessionConnectionTest.java | 0 .../test/java/org/apache/iotdb/session/SessionTest.java | 0 .../test/java/org/apache/iotdb/session/TabletTest.java | 0 .../iotdb/session/pool/SessionPoolExceptionTest.java | 0 .../org/apache/iotdb/session/pool/SessionPoolTest.java | 0 .../org/apache/iotdb/session/util/SessionUtilsTest.java | 0 .../org/apache/iotdb/session/util/ThreadUtilsTest.java | 0 .../session}/test/resources/iotdb-common.properties | 0 .../session}/test/resources/iotdb-datanode.properties | 0 .../apache/iotdb/session}/test/resources/logback.xml | 0 17 files changed, 59 insertions(+), 9 deletions(-) diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java index 371630a6b85..60a2a5adb43 100644 --- a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java +++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java @@ -22,9 +22,12 @@ package org.apache.iotdb.isession; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import java.util.List; +import java.util.Optional; import java.util.function.Supplier; public interface INodeSupplier extends Supplier<List<TEndPoint>> { void close(); + + Optional<TEndPoint> getQueryEndPoint(); } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/DummyNodesSupplier.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/DummyNodesSupplier.java index 64a7c87dcf2..bb3186d4200 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/DummyNodesSupplier.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/DummyNodesSupplier.java @@ -24,11 +24,14 @@ import org.apache.iotdb.isession.INodeSupplier; import java.util.Collections; import java.util.List; +import java.util.Optional; public class DummyNodesSupplier implements INodeSupplier { private final List<TEndPoint> availableNodes; + private final QueryEndPointPolicy policy = new RoundRobinPolicy(); + public DummyNodesSupplier(List<TEndPoint> availableNodes) { this.availableNodes = Collections.unmodifiableList(availableNodes); } @@ -38,6 +41,15 @@ public class DummyNodesSupplier implements INodeSupplier { // do nothing } + @Override + public Optional<TEndPoint> getQueryEndPoint() { + if (availableNodes == null || availableNodes.isEmpty()) { + return Optional.empty(); + } else { + return Optional.of(policy.chooseOne(availableNodes)); + } + } + @Override public List<TEndPoint> get() { return availableNodes; diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java index 8f0244bbcab..d674aebb49b 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import java.time.ZoneId; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ScheduledExecutorService; @@ -77,6 +78,8 @@ public class NodesSupplier implements INodeSupplier, Runnable { private final String version; + private final QueryEndPointPolicy policy = new RoundRobinPolicy(); + private ThriftConnection client; private volatile boolean closed = false; @@ -213,6 +216,15 @@ public class NodesSupplier implements INodeSupplier, Runnable { destroyCurrentClient(); } + @Override + public Optional<TEndPoint> getQueryEndPoint() { + if (availableNodes == null || availableNodes.isEmpty()) { + return Optional.empty(); + } else { + return Optional.of(policy.chooseOne(get())); + } + } + private boolean updateDataNodeList() { try (SessionDataSet sessionDataSet = client.executeQueryStatement(SHOW_DATA_NODES_COMMAND, TIMEOUT_IN_MS, FETCH_SIZE)) { diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/QueryEndPointPolicy.java similarity index 85% copy from iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java copy to iotdb-client/session/src/main/java/org/apache/iotdb/session/QueryEndPointPolicy.java index 371630a6b85..9554efda8de 100644 --- a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/QueryEndPointPolicy.java @@ -17,14 +17,13 @@ * under the License. */ -package org.apache.iotdb.isession; +package org.apache.iotdb.session; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import java.util.List; -import java.util.function.Supplier; -public interface INodeSupplier extends Supplier<List<TEndPoint>> { +public interface QueryEndPointPolicy { - void close(); + TEndPoint chooseOne(List<TEndPoint> endPointList); } diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/RoundRobinPolicy.java similarity index 72% copy from iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java copy to iotdb-client/session/src/main/java/org/apache/iotdb/session/RoundRobinPolicy.java index 371630a6b85..f99827cc33e 100644 --- a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/RoundRobinPolicy.java @@ -17,14 +17,23 @@ * under the License. */ -package org.apache.iotdb.isession; +package org.apache.iotdb.session; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import java.util.List; -import java.util.function.Supplier; -public interface INodeSupplier extends Supplier<List<TEndPoint>> { +public class RoundRobinPolicy implements QueryEndPointPolicy { - void close(); + private int index = 0; + + @Override + public TEndPoint chooseOne(List<TEndPoint> endPointList) { + int tmp = index; + if (tmp >= endPointList.size()) { + tmp = 0; + } + index = tmp + 1; + return endPointList.get(tmp); + } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index 6c0a42e159c..32cb6b471cd 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -887,7 +887,7 @@ public class Session implements ISession { private SessionDataSet executeStatementMayRedirect(String sql, long timeoutInMs) throws StatementExecutionException, IoTDBConnectionException { try { - return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs); + return getQuerySessionConnection().executeQueryStatement(sql, timeoutInMs); } catch (RedirectException e) { handleQueryRedirection(e.getEndPoint()); if (enableQueryRedirection) { @@ -904,6 +904,21 @@ public class Session implements ISession { } } + private SessionConnection getQuerySessionConnection() { + TEndPoint endPoint = availableNodes.getQueryEndPoint(); + SessionConnection connection = + endPointToSessionConnection.computeIfAbsent( + endPoint, + k -> { + try { + return constructSessionConnection(this, endPoint, zoneId); + } catch (IoTDBConnectionException ex) { + return null; + } + }); + return connection == null ? defaultSessionConnection : connection; + } + /** * execute non query statement * diff --git a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java similarity index 100% rename from iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java rename to iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java diff --git a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/SessionConnectionTest.java similarity index 100% rename from iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java rename to iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/SessionConnectionTest.java diff --git a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionTest.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/SessionTest.java similarity index 100% rename from iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionTest.java rename to iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/SessionTest.java diff --git a/iotdb-client/session/src/test/java/org/apache/iotdb/session/TabletTest.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/TabletTest.java similarity index 100% rename from iotdb-client/session/src/test/java/org/apache/iotdb/session/TabletTest.java rename to iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/TabletTest.java diff --git a/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolExceptionTest.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/pool/SessionPoolExceptionTest.java similarity index 100% rename from iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolExceptionTest.java rename to iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/pool/SessionPoolExceptionTest.java diff --git a/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java similarity index 100% rename from iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java rename to iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java diff --git a/iotdb-client/session/src/test/java/org/apache/iotdb/session/util/SessionUtilsTest.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/util/SessionUtilsTest.java similarity index 100% rename from iotdb-client/session/src/test/java/org/apache/iotdb/session/util/SessionUtilsTest.java rename to iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/util/SessionUtilsTest.java diff --git a/iotdb-client/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java similarity index 100% rename from iotdb-client/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java rename to iotdb-client/session/src/main/java/org/apache/iotdb/session/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java diff --git a/iotdb-client/session/src/test/resources/iotdb-common.properties b/iotdb-client/session/src/main/java/org/apache/iotdb/session/test/resources/iotdb-common.properties similarity index 100% rename from iotdb-client/session/src/test/resources/iotdb-common.properties rename to iotdb-client/session/src/main/java/org/apache/iotdb/session/test/resources/iotdb-common.properties diff --git a/iotdb-client/session/src/test/resources/iotdb-datanode.properties b/iotdb-client/session/src/main/java/org/apache/iotdb/session/test/resources/iotdb-datanode.properties similarity index 100% rename from iotdb-client/session/src/test/resources/iotdb-datanode.properties rename to iotdb-client/session/src/main/java/org/apache/iotdb/session/test/resources/iotdb-datanode.properties diff --git a/iotdb-client/session/src/test/resources/logback.xml b/iotdb-client/session/src/main/java/org/apache/iotdb/session/test/resources/logback.xml similarity index 100% rename from iotdb-client/session/src/test/resources/logback.xml rename to iotdb-client/session/src/main/java/org/apache/iotdb/session/test/resources/logback.xml
