This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_vgraft by this push:
new 5738db0531 tmp save
5738db0531 is described below
commit 5738db0531740e7ae00a6a6bd19f0a19b96d0ff7
Author: Tian Jiang <[email protected]>
AuthorDate: Sat Oct 8 22:39:58 2022 +0800
tmp save
---
.../iotdb/cluster/log/FragmentedLogDispatcher.java | 33 ++++++----------------
.../apache/iotdb/cluster/log/LogDispatcher.java | 2 +-
.../apache/iotdb/cluster/utils/ClusterUtils.java | 2 +-
3 files changed, 11 insertions(+), 26 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
index 3a77d15eb0..49dcc642d5 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.cluster.log;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -31,7 +30,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
public class FragmentedLogDispatcher extends LogDispatcher {
@@ -55,29 +53,16 @@ public class FragmentedLogDispatcher extends LogDispatcher {
fragmentedRequest
.getVotingLog()
.setLog(new FragmentedLog((FragmentedLog)
request.getVotingLog().getLog(), i++));
- try {
- boolean addSucceeded;
- if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
- addSucceeded =
- nodeLogQueue.offer(
- fragmentedRequest,
-
ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS(),
- TimeUnit.MILLISECONDS);
- } else {
- addSucceeded = nodeLogQueue.add(fragmentedRequest);
- }
-
- if (!addSucceeded) {
- logger.debug(
- "Log queue[{}] of {} is full, ignore the request to this node",
i, member.getName());
- } else {
- request.setEnqueueTime(System.nanoTime());
- }
- } catch (IllegalStateException e) {
+
+ boolean addSucceeded = addToQueue(nodeLogQueue, request);
+
+ if (!addSucceeded) {
logger.debug(
- "Log queue[{}] of {} is full, ignore the request to this node", i,
member.getName());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ "Log queue[{}] of {} is full, ignore the request to this node",
+ entry.left,
+ member.getName());
+ } else {
+ request.setEnqueueTime(System.nanoTime());
}
}
Statistic.LOG_DISPATCHER_LOG_ENQUEUE.calOperationCostTimeFromStart(startTime);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 10c27d52f1..a79bb8098a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -134,7 +134,7 @@ public class LogDispatcher {
return newRequest;
}
- private boolean addToQueue(BlockingQueue<SendLogRequest> nodeLogQueue,
SendLogRequest request) {
+ protected boolean addToQueue(BlockingQueue<SendLogRequest> nodeLogQueue,
SendLogRequest request) {
if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
long waitStart = System.currentTimeMillis();
long waitTime = 1;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index 87e2d1475b..02e8703e64 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -389,6 +389,6 @@ public class ClusterUtils {
}
public static String nodeToString(Node node) {
- return node.getInternalIp() + ":" + node.getMetaPort();
+ return node.getInternalIp() + "-" + node.getMetaPort();
}
}