This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_receiver_wait
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_receiver_wait by this
push:
new 8eed545 extract duplicates
8eed545 is described below
commit 8eed5458ac2c66ffcb4be7261e0ed1eb5a4cd6f5
Author: jt2594838 <[email protected]>
AuthorDate: Fri Aug 28 10:49:11 2020 +0800
extract duplicates
---
.../iotdb/cluster/server/member/RaftMember.java | 57 +++++++++-------------
1 file changed, 23 insertions(+), 34 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 4bc6da2..0cf0ff9 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -739,14 +739,8 @@ public abstract class RaftMember {
Peer peer) {
AsyncClient client = getAsyncClient(node);
if (client != null) {
- AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
- handler.setReceiver(node);
- handler.setVoteCounter(voteCounter);
- handler.setLeaderShipStale(leaderShipStale);
- handler.setLog(log);
- handler.setMember(this);
- handler.setPeer(peer);
- handler.setReceiverTerm(newLeaderTerm);
+ AppendNodeEntryHandler handler = getAppendNodeEntryHandler(log,
voteCounter, node,
+ leaderShipStale, newLeaderTerm, peer);
try {
client.appendEntry(request, handler);
logger.debug("{} sending a log to {}: {}", name, node, log);
@@ -756,19 +750,26 @@ public abstract class RaftMember {
}
}
+ private AppendNodeEntryHandler getAppendNodeEntryHandler(Log log,
AtomicInteger voteCounter,
+ Node node, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, Peer
peer) {
+ AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
+ handler.setReceiver(node);
+ handler.setVoteCounter(voteCounter);
+ handler.setLeaderShipStale(leaderShipStale);
+ handler.setLog(log);
+ handler.setMember(this);
+ handler.setPeer(peer);
+ handler.setReceiverTerm(newLeaderTerm);
+ return handler;
+ }
+
private void sendLogSync(Log log, AtomicInteger voteCounter, Node node,
AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm,
AppendEntryRequest request,
Peer peer) {
Client client = getSyncClient(node);
if (client != null) {
- AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
- handler.setReceiver(node);
- handler.setVoteCounter(voteCounter);
- handler.setLeaderShipStale(leaderShipStale);
- handler.setLog(log);
- handler.setMember(this);
- handler.setPeer(peer);
- handler.setReceiverTerm(newLeaderTerm);
+ AppendNodeEntryHandler handler = getAppendNodeEntryHandler(log,
voteCounter,
+ node, leaderShipStale, newLeaderTerm, peer);
try {
logger.debug("{} sending a log to {}: {}", name, node, log);
long result = client.appendEntry(request);
@@ -836,13 +837,17 @@ public abstract class RaftMember {
* @return
*/
public Client getSyncClient(Node node) {
+ return getSyncClient(syncClientPool, node);
+ }
+
+ private Client getSyncClient(SyncClientPool pool, Node node) {
if (node == null) {
return null;
}
Client client;
do {
- client = syncClientPool.getClient(node);
+ client = pool.getClient(node);
if (client == null) {
try {
Thread.sleep(syncClientTimeoutMills);
@@ -862,23 +867,7 @@ public abstract class RaftMember {
* @return the heartbeat client for the node
*/
public Client getSyncHeartbeatClient(Node node) {
- if (node == null) {
- return null;
- }
-
- Client client;
- do {
- client = syncHeartbeatClientPool.getClient(node);
- if (client == null) {
- try {
- Thread.sleep(syncClientTimeoutMills);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return null;
- }
- }
- } while (client == null);
- return client;
+ return getSyncClient(syncHeartbeatClientPool, node);
}
private boolean isClientReady(AsyncClient client) {