This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 257d9aabac0 [IOTDB-6327] Random choosing available nodes to send sql 
requests
257d9aabac0 is described below

commit 257d9aabac0ebd68c7ab1525448f8d82e6aeac32
Author: Jackie Tien <[email protected]>
AuthorDate: Wed May 15 12:11:18 2024 +0800

    [IOTDB-6327] Random choosing available nodes to send sql requests
---
 .../org/apache/iotdb/isession/INodeSupplier.java   |  3 +++
 .../apache/iotdb/session/DummyNodesSupplier.java   | 12 ++++++++++
 .../org/apache/iotdb/session/NodesSupplier.java    | 26 ++++++++++++++++------
 .../apache/iotdb/session/QueryEndPointPolicy.java} |  7 +++---
 .../apache/iotdb/session/RoundRobinPolicy.java}    | 17 ++++++++++----
 .../java/org/apache/iotdb/session/Session.java     | 22 +++++++++++++++++-
 6 files changed, 71 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
index 371630a6b85..60a2a5adb43 100644
--- 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
+++ 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
@@ -22,9 +22,12 @@ package org.apache.iotdb.isession;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Supplier;
 
 public interface INodeSupplier extends Supplier<List<TEndPoint>> {
 
   void close();
+
+  Optional<TEndPoint> getQueryEndPoint();
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/DummyNodesSupplier.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/DummyNodesSupplier.java
index 64a7c87dcf2..bb3186d4200 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/DummyNodesSupplier.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/DummyNodesSupplier.java
@@ -24,11 +24,14 @@ import org.apache.iotdb.isession.INodeSupplier;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 public class DummyNodesSupplier implements INodeSupplier {
 
   private final List<TEndPoint> availableNodes;
 
+  private final QueryEndPointPolicy policy = new RoundRobinPolicy();
+
   public DummyNodesSupplier(List<TEndPoint> availableNodes) {
     this.availableNodes = Collections.unmodifiableList(availableNodes);
   }
@@ -38,6 +41,15 @@ public class DummyNodesSupplier implements INodeSupplier {
     // do nothing
   }
 
+  @Override
+  public Optional<TEndPoint> getQueryEndPoint() {
+    if (availableNodes == null || availableNodes.isEmpty()) {
+      return Optional.empty();
+    } else {
+      return Optional.of(policy.chooseOne(availableNodes));
+    }
+  }
+
   @Override
   public List<TEndPoint> get() {
     return availableNodes;
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
index 8f0244bbcab..37532023ce6 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
@@ -28,9 +28,10 @@ import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -55,9 +56,9 @@ public class NodesSupplier implements INodeSupplier, Runnable 
{
 
   private static final int FETCH_SIZE = 10_000;
 
-  // availableNodes won't be updated frequently, so we use CopyOnWriteArraySet 
which is thread-safe
+  // availableNodes won't be updated frequently, so we use 
CopyOnWriteArrayList which is thread-safe
   // and is optimized for scenarios of reading more and writing less
-  private volatile Set<TEndPoint> availableNodes = new CopyOnWriteArraySet<>();
+  private volatile List<TEndPoint> availableNodes = new 
CopyOnWriteArrayList<>();
 
   private final boolean useSSL;
   private final String trustStore;
@@ -77,6 +78,8 @@ public class NodesSupplier implements INodeSupplier, Runnable 
{
 
   private final String version;
 
+  private final QueryEndPointPolicy policy = new RoundRobinPolicy();
+
   private ThriftConnection client;
 
   private volatile boolean closed = false;
@@ -133,7 +136,7 @@ public class NodesSupplier implements INodeSupplier, 
Runnable {
       String trustStorePwd,
       boolean enableRPCCompression,
       String version) {
-    this.availableNodes.addAll(endPointList);
+    this.availableNodes.addAll(new HashSet<>(endPointList));
     this.userName = userName;
     this.password = password;
     this.useSSL = useSSL;
@@ -151,7 +154,7 @@ public class NodesSupplier implements INodeSupplier, 
Runnable {
   // and the List needn't be thread-safe, because it will only be used in one 
thread.
   @Override
   public List<TEndPoint> get() {
-    return new ArrayList<>(availableNodes);
+    return availableNodes;
   }
 
   @Override
@@ -213,6 +216,15 @@ public class NodesSupplier implements INodeSupplier, 
Runnable {
     destroyCurrentClient();
   }
 
+  @Override
+  public Optional<TEndPoint> getQueryEndPoint() {
+    if (availableNodes == null || availableNodes.isEmpty()) {
+      return Optional.empty();
+    } else {
+      return Optional.of(policy.chooseOne(get()));
+    }
+  }
+
   private boolean updateDataNodeList() {
     try (SessionDataSet sessionDataSet =
         client.executeQueryStatement(SHOW_DATA_NODES_COMMAND, TIMEOUT_IN_MS, 
FETCH_SIZE)) {
@@ -231,7 +243,7 @@ public class NodesSupplier implements INodeSupplier, 
Runnable {
       }
       // replace the older ones.
       if (!res.isEmpty()) {
-        availableNodes = new CopyOnWriteArraySet<>(res);
+        availableNodes = res;
       }
       return true;
     } catch (Exception e) {
diff --git 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/QueryEndPointPolicy.java
similarity index 85%
copy from 
iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
copy to 
iotdb-client/session/src/main/java/org/apache/iotdb/session/QueryEndPointPolicy.java
index 371630a6b85..9554efda8de 100644
--- 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/QueryEndPointPolicy.java
@@ -17,14 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.isession;
+package org.apache.iotdb.session;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 
 import java.util.List;
-import java.util.function.Supplier;
 
-public interface INodeSupplier extends Supplier<List<TEndPoint>> {
+public interface QueryEndPointPolicy {
 
-  void close();
+  TEndPoint chooseOne(List<TEndPoint> endPointList);
 }
diff --git 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/RoundRobinPolicy.java
similarity index 72%
copy from 
iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
copy to 
iotdb-client/session/src/main/java/org/apache/iotdb/session/RoundRobinPolicy.java
index 371630a6b85..f99827cc33e 100644
--- 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/INodeSupplier.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/RoundRobinPolicy.java
@@ -17,14 +17,23 @@
  * under the License.
  */
 
-package org.apache.iotdb.isession;
+package org.apache.iotdb.session;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 
 import java.util.List;
-import java.util.function.Supplier;
 
-public interface INodeSupplier extends Supplier<List<TEndPoint>> {
+public class RoundRobinPolicy implements QueryEndPointPolicy {
 
-  void close();
+  private int index = 0;
+
+  @Override
+  public TEndPoint chooseOne(List<TEndPoint> endPointList) {
+    int tmp = index;
+    if (tmp >= endPointList.size()) {
+      tmp = 0;
+    }
+    index = tmp + 1;
+    return endPointList.get(tmp);
+  }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 626578c4f83..c984744f09c 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -89,6 +89,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -888,7 +889,7 @@ public class Session implements ISession {
   private SessionDataSet executeStatementMayRedirect(String sql, long 
timeoutInMs)
       throws StatementExecutionException, IoTDBConnectionException {
     try {
-      return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
+      return getQuerySessionConnection().executeQueryStatement(sql, 
timeoutInMs);
     } catch (RedirectException e) {
       handleQueryRedirection(e.getEndPoint());
       if (enableQueryRedirection) {
@@ -905,6 +906,25 @@ public class Session implements ISession {
     }
   }
 
+  private SessionConnection getQuerySessionConnection() {
+    Optional<TEndPoint> endPoint =
+        availableNodes == null ? Optional.empty() : 
availableNodes.getQueryEndPoint();
+    if (!endPoint.isPresent() || endPointToSessionConnection == null) {
+      return defaultSessionConnection;
+    }
+    SessionConnection connection =
+        endPointToSessionConnection.computeIfAbsent(
+            endPoint.get(),
+            k -> {
+              try {
+                return constructSessionConnection(this, endPoint.get(), 
zoneId);
+              } catch (IoTDBConnectionException ex) {
+                return null;
+              }
+            });
+    return connection == null ? defaultSessionConnection : connection;
+  }
+
   /**
    * execute non query statement
    *

Reply via email to