This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-6199
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-6199 by this push:
new 7447af76882 self test
7447af76882 is described below
commit 7447af768822d7c7f5fc01c7a6ca346e6fa1ae64
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Dec 12 16:05:00 2023 +0800
self test
---
.../org/apache/iotdb/session/NodesSupplier.java | 16 +++--
.../java/org/apache/iotdb/session/Session.java | 19 +++++-
.../apache/iotdb/session/SessionConnection.java | 1 +
.../org/apache/iotdb/session/ThriftConnection.java | 2 +-
.../org/apache/iotdb/session/pool/SessionPool.java | 70 ++++++++++++++++------
.../scheduler/FragmentInstanceDispatcherImpl.java | 3 +-
.../datanode1conf/iotdb-datanode.properties | 2 +-
.../datanode2conf/iotdb-datanode.properties | 2 +-
.../datanode3conf/iotdb-datanode.properties | 2 +-
9 files changed, 86 insertions(+), 31 deletions(-)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
index e48bdca2ab0..47f7270511d 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/NodesSupplier.java
@@ -138,7 +138,7 @@ public class NodesSupplier implements INodeSupplier,
Runnable {
this.trustStore = trustStore;
this.trustStorePwd = trustStorePwd;
this.enableRPCCompression = enableRPCCompression;
- this.zoneId = zoneId;
+ this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId;
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
this.connectionTimeoutInMs = connectionTimeoutInMs;
@@ -183,7 +183,7 @@ public class NodesSupplier implements INodeSupplier,
Runnable {
version);
return true;
} catch (Exception e) {
- LOGGER.warn("Failed to create connection with {}.", endPoint, e);
+ LOGGER.warn("Failed to create connection with {}.", endPoint);
close();
return false;
}
@@ -201,8 +201,10 @@ public class NodesSupplier implements INodeSupplier,
Runnable {
SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
List<TEndPoint> res = new ArrayList<>();
while (iterator.next()) {
- if (RUNNING_STATUS.equals(iterator.getString(STATUS_COLUMN_NAME))) {
- String ip = iterator.getString(IP_COLUMN_NAME);
+ String ip = iterator.getString(IP_COLUMN_NAME);
+ // ignore 0.0.0.0 and not running DN
+ if (RUNNING_STATUS.equals(iterator.getString(STATUS_COLUMN_NAME))
+ && !"0.0.0.0".equals(ip)) {
String port = iterator.getString(PORT_COLUMN_NAME);
if (ip != null && port != null) {
res.add(new TEndPoint(ip, Integer.parseInt(port)));
@@ -210,10 +212,12 @@ public class NodesSupplier implements INodeSupplier,
Runnable {
}
}
// replace the older ones.
- availableNodes = new CopyOnWriteArraySet<>(res);
+ if (!res.isEmpty()) {
+ availableNodes = new CopyOnWriteArraySet<>(res);
+ }
return true;
} catch (Exception e) {
- LOGGER.warn("Failed to fetch data node list from {}.", client.endPoint,
e);
+ LOGGER.warn("Failed to fetch data node list from {}.", client.endPoint);
return false;
}
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 142aea1f4fb..af6166f4103 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -459,7 +459,7 @@ public class Session implements ISession {
this.availableNodes = null;
}
- this.executorService = Executors.newSingleThreadScheduledExecutor();
+ initThreadPool();
this.availableNodes =
NodesSupplier.createNodeSupplier(
getNodeUrls(),
@@ -487,6 +487,23 @@ public class Session implements ISession {
}
}
+ private void initThreadPool() {
+ this.executorService =
+ Executors.newSingleThreadScheduledExecutor(
+ r -> {
+ Thread t =
+ new Thread(
+ Thread.currentThread().getThreadGroup(), r,
"PeriodicalUpdateDNList", 0);
+ if (!t.isDaemon()) {
+ t.setDaemon(true);
+ }
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ return t;
+ });
+ }
+
private List<TEndPoint> getNodeUrls() {
if (defaultEndPoint != null) {
return Collections.singletonList(defaultEndPoint);
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index a4a2bedbacd..fdda3c31fb7 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -966,6 +966,7 @@ public class SessionConnection {
for (int i = 1; i <= SessionConfig.RETRY_NUM; i++) {
if (transport != null) {
transport.close();
+ endPointList = availableNodes.get();
int currHostIndex = random.nextInt(endPointList.size());
int tryHostNum = 0;
for (int j = currHostIndex; j < endPointList.size(); j++) {
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
index e867f961f14..91f055f55b5 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java
@@ -180,7 +180,7 @@ public class ThriftConnection {
try {
client.closeSession(new TSCloseSessionReq(sessionId));
} catch (TException e) {
- LOGGER.warn("Closing Session-{} with {} failed", sessionId, endPoint,
e);
+ LOGGER.warn("Closing Session-{} with {} failed.", sessionId, endPoint);
if (transport.isOpen()) {
transport.close();
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 902c8ded417..ac8b90c34e5 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -367,7 +367,7 @@ public class SessionPool implements ISessionPool {
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
this.formattedNodeUrls = String.format("%s:%s", host, port);
- this.executorService = Executors.newSingleThreadScheduledExecutor();
+ initThreadPool();
this.availableNodes =
NodesSupplier.createNodeSupplier(
Collections.singletonList(new TEndPoint(host, port)),
@@ -425,7 +425,7 @@ public class SessionPool implements ISessionPool {
this.useSSL = useSSL;
this.trustStore = trustStore;
this.trustStorePwd = trustStorePwd;
- this.executorService = Executors.newSingleThreadScheduledExecutor();
+ initThreadPool();
this.availableNodes =
NodesSupplier.createNodeSupplier(
Collections.singletonList(new TEndPoint(host, port)),
@@ -477,7 +477,7 @@ public class SessionPool implements ISessionPool {
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
this.formattedNodeUrls = nodeUrls.toString();
- this.executorService = Executors.newSingleThreadScheduledExecutor();
+ initThreadPool();
this.availableNodes =
NodesSupplier.createNodeSupplier(
SessionUtils.parseSeedNodeUrls(nodeUrls),
@@ -511,36 +511,51 @@ public class SessionPool implements ISessionPool {
this.version = builder.version;
this.thriftDefaultBufferSize = builder.thriftDefaultBufferSize;
this.thriftMaxFrameSize = builder.thriftMaxFrameSize;
+ initThreadPool();
if (builder.nodeUrls != null && builder.nodeUrls.size() > 0) {
this.nodeUrls = builder.nodeUrls;
this.host = null;
this.port = -1;
this.formattedNodeUrls = builder.nodeUrls.toString();
+ this.availableNodes =
+ NodesSupplier.createNodeSupplier(
+ SessionUtils.parseSeedNodeUrls(nodeUrls),
+ executorService,
+ user,
+ password,
+ zoneId,
+ thriftDefaultBufferSize,
+ thriftMaxFrameSize,
+ connectionTimeoutInMs,
+ useSSL,
+ trustStore,
+ trustStorePwd,
+ enableCompression,
+ version.toString());
} else {
this.host = builder.host;
this.port = builder.port;
this.nodeUrls = null;
this.formattedNodeUrls = String.format("%s:%s", host, port);
+ this.availableNodes =
+ NodesSupplier.createNodeSupplier(
+ Collections.singletonList(new TEndPoint(host, port)),
+ executorService,
+ user,
+ password,
+ zoneId,
+ thriftDefaultBufferSize,
+ thriftMaxFrameSize,
+ connectionTimeoutInMs,
+ useSSL,
+ trustStore,
+ trustStorePwd,
+ enableCompression,
+ version.toString());
}
this.useSSL = builder.useSSL;
this.trustStore = builder.trustStore;
this.trustStorePwd = builder.trustStorePwd;
- this.executorService = Executors.newSingleThreadScheduledExecutor();
- this.availableNodes =
- NodesSupplier.createNodeSupplier(
- SessionUtils.parseSeedNodeUrls(nodeUrls),
- executorService,
- user,
- password,
- zoneId,
- thriftDefaultBufferSize,
- thriftMaxFrameSize,
- connectionTimeoutInMs,
- useSSL,
- trustStore,
- trustStorePwd,
- enableCompression,
- version.toString());
}
private Session constructNewSession() {
@@ -585,6 +600,23 @@ public class SessionPool implements ISessionPool {
return session;
}
+ private void initThreadPool() {
+ this.executorService =
+ Executors.newSingleThreadScheduledExecutor(
+ r -> {
+ Thread t =
+ new Thread(
+ Thread.currentThread().getThreadGroup(), r,
"PeriodicalUpdateDNList", 0);
+ if (!t.isDaemon()) {
+ t.setDaemon(true);
+ }
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ return t;
+ });
+ }
+
// if this method throws an exception, either the server is broken, or the
ip/port/user/password
// is incorrect.
@SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive
Complexity warning
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index a1ab241fa6a..7c7ecfd9524 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -380,8 +380,9 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
: readExecutor.execute(groupId, instance);
if (!readResult.isAccepted()) {
logger.warn(readResult.getMessage());
+
queryContext.addFailedEndPoint(instance.getHostDataNode().internalEndPoint);
throw new FragmentInstanceDispatchException(
- RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
readResult.getMessage()));
+ RpcUtils.getStatus(TSStatusCode.DISPATCH_ERROR,
readResult.getMessage()));
}
break;
case WRITE:
diff --git
a/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-datanode.properties
b/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-datanode.properties
index 72acbce4157..a7ed41f2615 100644
---
a/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-datanode.properties
+++
b/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-datanode.properties
@@ -17,7 +17,7 @@
# under the License.
#
-dn_rpc_address=0.0.0.0
+dn_rpc_address=127.0.0.1
dn_internal_address=127.0.0.1
dn_rpc_port=6667
diff --git
a/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-datanode.properties
b/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-datanode.properties
index b1037c8457b..a8508a79678 100644
---
a/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-datanode.properties
+++
b/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-datanode.properties
@@ -17,7 +17,7 @@
# under the License.
#
-dn_rpc_address=0.0.0.0
+dn_rpc_address=127.0.0.1
dn_internal_address=127.0.0.1
dn_rpc_port=6668
diff --git
a/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-datanode.properties
b/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-datanode.properties
index 7ce924a5a61..d4faf82e09b 100644
---
a/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-datanode.properties
+++
b/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-datanode.properties
@@ -17,7 +17,7 @@
# under the License.
#
-dn_rpc_address=0.0.0.0
+dn_rpc_address=127.0.0.1
dn_internal_address=127.0.0.1
dn_rpc_port=6669