This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-6199
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-6199 by this push:
     new 7447af76882 self test
7447af76882 is described below

commit 7447af768822d7c7f5fc01c7a6ca346e6fa1ae64
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Dec 12 16:05:00 2023 +0800

    self test
---
 .../org/apache/iotdb/session/NodesSupplier.java    | 16 +++--
 .../java/org/apache/iotdb/session/Session.java     | 19 +++++-
 .../apache/iotdb/session/SessionConnection.java    |  1 +
 .../org/apache/iotdb/session/ThriftConnection.java |  2 +-
 .../org/apache/iotdb/session/pool/SessionPool.java | 70 ++++++++++++++++------
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  3 +-
 .../datanode1conf/iotdb-datanode.properties        |  2 +-
 .../datanode2conf/iotdb-datanode.properties        |  2 +-
 .../datanode3conf/iotdb-datanode.properties        |  2 +-
 9 files changed, 86 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
index e48bdca2ab0..47f7270511d 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
@@ -138,7 +138,7 @@ public class NodesSupplier implements INodeSupplier, 
Runnable {
     this.trustStore = trustStore;
     this.trustStorePwd = trustStorePwd;
     this.enableRPCCompression = enableRPCCompression;
-    this.zoneId = zoneId;
+    this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
     this.thriftMaxFrameSize = thriftMaxFrameSize;
     this.connectionTimeoutInMs = connectionTimeoutInMs;
@@ -183,7 +183,7 @@ public class NodesSupplier implements INodeSupplier, 
Runnable {
           version);
       return true;
     } catch (Exception e) {
-      LOGGER.warn("Failed to create connection with {}.", endPoint, e);
+      LOGGER.warn("Failed to create connection with {}.", endPoint);
       close();
       return false;
     }
@@ -201,8 +201,10 @@ public class NodesSupplier implements INodeSupplier, 
Runnable {
       SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
       List<TEndPoint> res = new ArrayList<>();
       while (iterator.next()) {
-        if (RUNNING_STATUS.equals(iterator.getString(STATUS_COLUMN_NAME))) {
-          String ip = iterator.getString(IP_COLUMN_NAME);
+        String ip = iterator.getString(IP_COLUMN_NAME);
+        // ignore 0.0.0.0 and not running DN
+        if (RUNNING_STATUS.equals(iterator.getString(STATUS_COLUMN_NAME))
+            && !"0.0.0.0".equals(ip)) {
           String port = iterator.getString(PORT_COLUMN_NAME);
           if (ip != null && port != null) {
             res.add(new TEndPoint(ip, Integer.parseInt(port)));
@@ -210,10 +212,12 @@ public class NodesSupplier implements INodeSupplier, 
Runnable {
         }
       }
       // replace the older ones.
-      availableNodes = new CopyOnWriteArraySet<>(res);
+      if (!res.isEmpty()) {
+        availableNodes = new CopyOnWriteArraySet<>(res);
+      }
       return true;
     } catch (Exception e) {
-      LOGGER.warn("Failed to fetch data node list from {}.", client.endPoint, 
e);
+      LOGGER.warn("Failed to fetch data node list from {}.", client.endPoint);
       return false;
     }
   }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 142aea1f4fb..af6166f4103 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -459,7 +459,7 @@ public class Session implements ISession {
       this.availableNodes = null;
     }
 
-    this.executorService = Executors.newSingleThreadScheduledExecutor();
+    initThreadPool();
     this.availableNodes =
         NodesSupplier.createNodeSupplier(
             getNodeUrls(),
@@ -487,6 +487,23 @@ public class Session implements ISession {
     }
   }
 
+  private void initThreadPool() {
+    this.executorService =
+        Executors.newSingleThreadScheduledExecutor(
+            r -> {
+              Thread t =
+                  new Thread(
+                      Thread.currentThread().getThreadGroup(), r, 
"PeriodicalUpdateDNList", 0);
+              if (!t.isDaemon()) {
+                t.setDaemon(true);
+              }
+              if (t.getPriority() != Thread.NORM_PRIORITY) {
+                t.setPriority(Thread.NORM_PRIORITY);
+              }
+              return t;
+            });
+  }
+
   private List<TEndPoint> getNodeUrls() {
     if (defaultEndPoint != null) {
       return Collections.singletonList(defaultEndPoint);
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index a4a2bedbacd..fdda3c31fb7 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -966,6 +966,7 @@ public class SessionConnection {
     for (int i = 1; i <= SessionConfig.RETRY_NUM; i++) {
       if (transport != null) {
         transport.close();
+        endPointList = availableNodes.get();
         int currHostIndex = random.nextInt(endPointList.size());
         int tryHostNum = 0;
         for (int j = currHostIndex; j < endPointList.size(); j++) {
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
index e867f961f14..91f055f55b5 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
@@ -180,7 +180,7 @@ public class ThriftConnection {
       try {
         client.closeSession(new TSCloseSessionReq(sessionId));
       } catch (TException e) {
-        LOGGER.warn("Closing Session-{} with {} failed", sessionId, endPoint, 
e);
+        LOGGER.warn("Closing Session-{} with {} failed.", sessionId, endPoint);
         if (transport.isOpen()) {
           transport.close();
         }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 902c8ded417..ac8b90c34e5 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -367,7 +367,7 @@ public class SessionPool implements ISessionPool {
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
     this.thriftMaxFrameSize = thriftMaxFrameSize;
     this.formattedNodeUrls = String.format("%s:%s", host, port);
-    this.executorService = Executors.newSingleThreadScheduledExecutor();
+    initThreadPool();
     this.availableNodes =
         NodesSupplier.createNodeSupplier(
             Collections.singletonList(new TEndPoint(host, port)),
@@ -425,7 +425,7 @@ public class SessionPool implements ISessionPool {
     this.useSSL = useSSL;
     this.trustStore = trustStore;
     this.trustStorePwd = trustStorePwd;
-    this.executorService = Executors.newSingleThreadScheduledExecutor();
+    initThreadPool();
     this.availableNodes =
         NodesSupplier.createNodeSupplier(
             Collections.singletonList(new TEndPoint(host, port)),
@@ -477,7 +477,7 @@ public class SessionPool implements ISessionPool {
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
     this.thriftMaxFrameSize = thriftMaxFrameSize;
     this.formattedNodeUrls = nodeUrls.toString();
-    this.executorService = Executors.newSingleThreadScheduledExecutor();
+    initThreadPool();
     this.availableNodes =
         NodesSupplier.createNodeSupplier(
             SessionUtils.parseSeedNodeUrls(nodeUrls),
@@ -511,36 +511,51 @@ public class SessionPool implements ISessionPool {
     this.version = builder.version;
     this.thriftDefaultBufferSize = builder.thriftDefaultBufferSize;
     this.thriftMaxFrameSize = builder.thriftMaxFrameSize;
+    initThreadPool();
     if (builder.nodeUrls != null && builder.nodeUrls.size() > 0) {
       this.nodeUrls = builder.nodeUrls;
       this.host = null;
       this.port = -1;
       this.formattedNodeUrls = builder.nodeUrls.toString();
+      this.availableNodes =
+          NodesSupplier.createNodeSupplier(
+              SessionUtils.parseSeedNodeUrls(nodeUrls),
+              executorService,
+              user,
+              password,
+              zoneId,
+              thriftDefaultBufferSize,
+              thriftMaxFrameSize,
+              connectionTimeoutInMs,
+              useSSL,
+              trustStore,
+              trustStorePwd,
+              enableCompression,
+              version.toString());
     } else {
       this.host = builder.host;
       this.port = builder.port;
       this.nodeUrls = null;
       this.formattedNodeUrls = String.format("%s:%s", host, port);
+      this.availableNodes =
+          NodesSupplier.createNodeSupplier(
+              Collections.singletonList(new TEndPoint(host, port)),
+              executorService,
+              user,
+              password,
+              zoneId,
+              thriftDefaultBufferSize,
+              thriftMaxFrameSize,
+              connectionTimeoutInMs,
+              useSSL,
+              trustStore,
+              trustStorePwd,
+              enableCompression,
+              version.toString());
     }
     this.useSSL = builder.useSSL;
     this.trustStore = builder.trustStore;
     this.trustStorePwd = builder.trustStorePwd;
-    this.executorService = Executors.newSingleThreadScheduledExecutor();
-    this.availableNodes =
-        NodesSupplier.createNodeSupplier(
-            SessionUtils.parseSeedNodeUrls(nodeUrls),
-            executorService,
-            user,
-            password,
-            zoneId,
-            thriftDefaultBufferSize,
-            thriftMaxFrameSize,
-            connectionTimeoutInMs,
-            useSSL,
-            trustStore,
-            trustStorePwd,
-            enableCompression,
-            version.toString());
   }
 
   private Session constructNewSession() {
@@ -585,6 +600,23 @@ public class SessionPool implements ISessionPool {
     return session;
   }
 
+  private void initThreadPool() {
+    this.executorService =
+        Executors.newSingleThreadScheduledExecutor(
+            r -> {
+              Thread t =
+                  new Thread(
+                      Thread.currentThread().getThreadGroup(), r, 
"PeriodicalUpdateDNList", 0);
+              if (!t.isDaemon()) {
+                t.setDaemon(true);
+              }
+              if (t.getPriority() != Thread.NORM_PRIORITY) {
+                t.setPriority(Thread.NORM_PRIORITY);
+              }
+              return t;
+            });
+  }
+
   // if this method throws an exception, either the server is broken, or the 
ip/port/user/password
   // is incorrect.
   @SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive 
Complexity warning
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index a1ab241fa6a..7c7ecfd9524 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -380,8 +380,9 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
                 : readExecutor.execute(groupId, instance);
         if (!readResult.isAccepted()) {
           logger.warn(readResult.getMessage());
+          
queryContext.addFailedEndPoint(instance.getHostDataNode().internalEndPoint);
           throw new FragmentInstanceDispatchException(
-              RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
readResult.getMessage()));
+              RpcUtils.getStatus(TSStatusCode.DISPATCH_ERROR, 
readResult.getMessage()));
         }
         break;
       case WRITE:
diff --git 
a/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-datanode.properties
 
b/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-datanode.properties
index 72acbce4157..a7ed41f2615 100644
--- 
a/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-datanode.properties
+++ 
b/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-datanode.properties
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-dn_rpc_address=0.0.0.0
+dn_rpc_address=127.0.0.1
 dn_internal_address=127.0.0.1
 
 dn_rpc_port=6667
diff --git 
a/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-datanode.properties
 
b/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-datanode.properties
index b1037c8457b..a8508a79678 100644
--- 
a/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-datanode.properties
+++ 
b/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-datanode.properties
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-dn_rpc_address=0.0.0.0
+dn_rpc_address=127.0.0.1
 dn_internal_address=127.0.0.1
 
 dn_rpc_port=6668
diff --git 
a/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-datanode.properties
 
b/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-datanode.properties
index 7ce924a5a61..d4faf82e09b 100644
--- 
a/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-datanode.properties
+++ 
b/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-datanode.properties
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-dn_rpc_address=0.0.0.0
+dn_rpc_address=127.0.0.1
 dn_internal_address=127.0.0.1
 
 dn_rpc_port=6669

Reply via email to