This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_read by this push:
new 4aec1cc reimpl raft node as manager
4aec1cc is described below
commit 4aec1cc4e411f0e43066622e4a44361bdc1653fa
Author: lta <[email protected]>
AuthorDate: Wed May 1 10:31:06 2019 +0800
reimpl raft node as manager
---
.travis.yml | 26 +++++-----
.../rpc/raft/impl/RaftNodeAsClientManager.java | 60 +++++++---------------
2 files changed, 31 insertions(+), 55 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 077d8ab..cc24d5e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -73,19 +73,19 @@ matrix:
- mvn -version
- mvn -B clean integration-test
- - os: linux
- name: linux-openjdk11
- dist: trusty
- sudo: required
- before_install:
- - wget
https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz
-O jdk11.tar.gz
- - tar -xzf jdk11.tar.gz
- before_script:
- - export JAVA_HOME=$PWD/jdk-11.0.2/
- - export PATH=$JAVA_HOME/bin:$PATH
- script:
- - java -version
- - mvn -B clean integration-test
+# - os: linux
+# name: linux-openjdk11
+# dist: trusty
+# sudo: required
+# before_install:
+# - wget
https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz
-O jdk11.tar.gz
+# - tar -xzf jdk11.tar.gz
+# before_script:
+# - export JAVA_HOME=$PWD/jdk-11.0.2/
+# - export PATH=$JAVA_HOME/bin:$PATH
+# script:
+# - java -version
+# - mvn -B clean integration-test
- os: linux
name: linux-openjdk8
dist: trusty
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 10598ae..3d14ab4 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -55,8 +55,8 @@ public class RaftNodeAsClientManager {
private static final int TASK_TIMEOUT_MS = CLUSTER_CONFIG.getQpTaskTimeout();
/**
- * Max valid number of @NodeAsClient usage, represent the number can
triggerAction simultaneously at the
- * same time
+ * Max valid number of @NodeAsClient usage, represent the number can
triggerAction simultaneously
+ * at the same time
*/
private static final int MAX_VALID_CLIENT_NUM =
CLUSTER_CONFIG.getMaxNumOfInnerRpcClient();
@@ -95,11 +95,6 @@ public class RaftNodeAsClientManager {
*/
private volatile boolean isShuttingDown;
- /**
- * Interval of thread sleep, unit is millisecond.
- */
- private static final int THREAD_SLEEP_INTERVAL = 10;
-
private RaftNodeAsClientManager() {
}
@@ -120,48 +115,31 @@ public class RaftNodeAsClientManager {
CLUSTER_CONFIG.getMaxNumOfInnerRpcClient() + CLUSTER_CONFIG
.getMaxQueueNumOfInnerRpcClient()));
}
- checkShuttingDown();
- if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
- clientNumInUse.incrementAndGet();
- return getClient();
- }
- } finally {
- resourceLock.unlock();
- }
- return tryToGetClient();
- }
-
- private void checkShuttingDown() throws RaftConnectionException {
- if (isShuttingDown) {
- throw new RaftConnectionException(
- "Reject to provide RaftNodeAsClient client because cluster system is
shutting down");
- }
- }
-
- /**
- * Check whether it can get the clientList
- */
- private RaftNodeAsClient tryToGetClient() throws RaftConnectionException {
- resourceLock.lock();
- queueClientNum++;
- try {
- while (true) {
- if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
+ queueClientNum++;
+ try {
+ while (true) {
checkShuttingDown();
if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
clientNumInUse.incrementAndGet();
return getClient();
}
+ resourceCondition.await();
}
- resourceCondition.await();
+ } catch (InterruptedException e) {
+ throw new RaftConnectionException("An error occurred when trying to
get NodeAsClient", e);
+ } finally {
+ queueClientNum--;
}
- } catch (InterruptedException e) {
- throw new RaftConnectionException("An error occurred when trying to get
NodeAsClient", e);
} finally {
- queueClientNum--;
resourceLock.unlock();
}
+ }
+ private void checkShuttingDown() throws RaftConnectionException {
+ if (isShuttingDown) {
+ throw new RaftConnectionException(
+ "Reject to provide RaftNodeAsClient client because cluster system is
shutting down");
+ }
}
/**
@@ -182,9 +160,7 @@ public class RaftNodeAsClientManager {
resourceLock.lock();
try {
clientNumInUse.decrementAndGet();
- if (clientList.isEmpty() && queueClientNum > 0) {
- resourceCondition.signal();
- }
+ resourceCondition.signalAll();
clientList.addLast(client);
} finally {
resourceLock.unlock();
@@ -195,7 +171,7 @@ public class RaftNodeAsClientManager {
isShuttingDown = true;
while (clientNumInUse.get() != 0 && queueClientNum != 0) {
// wait until releasing all usage of clients.
- Thread.sleep(THREAD_SLEEP_INTERVAL);
+ resourceCondition.await();
}
while (!clientList.isEmpty()) {
clientList.removeFirst().shutdown();