vongosling closed pull request #153: [ROCKETMQ-272] Fix sync slave timeout when 
using SYNC_MASTER
URL: https://github.com/apache/rocketmq/pull/153
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index edd68a584..42c1c218a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -667,7 +667,7 @@ public void handleHA(AppendMessageResult result, 
PutMessageResult putMessageResu
                     service.putRequest(request);
                     service.getWaitNotifyObject().wakeupAll();
                     boolean flushOK =
-                        
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+                        
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncSlaveTimeout());
                     if (!flushOK) {
                         log.error("do sync transfer other node, wait return, 
but failed, topic: " + messageExt.getTopic() + " tags: "
                             + messageExt.getTags() + " client address: " + 
messageExt.getBornHostNameString());
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 02aa84a3e..5a84f0806 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -126,6 +126,7 @@
     @ImportantField
     private FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH;
     private int syncFlushTimeout = 1000 * 5;
+    private int syncSlaveTimeout = 1000 * 5;
     private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 
9m 10m 20m 30m 1h 2h";
     private long flushDelayOffsetInterval = 1000 * 10;
     @ImportantField
@@ -519,6 +520,14 @@ public int getSyncFlushTimeout() {
         return syncFlushTimeout;
     }
 
+    public void setSyncSlaveTimeout(int syncSlaveTimeout) {
+        this.syncSlaveTimeout = syncSlaveTimeout;
+    }
+
+    public int getSyncSlaveTimeout() {
+        return syncSlaveTimeout;
+    }
+
     public void setSyncFlushTimeout(int syncFlushTimeout) {
         this.syncFlushTimeout = syncFlushTimeout;
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 51a8a2703..a25093852 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -280,13 +280,17 @@ private void doWaitTransfer() {
                 if (!this.requestsRead.isEmpty()) {
                     for (CommitLog.GroupCommitRequest req : this.requestsRead) 
{
                         boolean transferOK = 
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
-                        for (int i = 0; !transferOK && i < 5; i++) {
+
+                        long startTime = System.currentTimeMillis();
+                        long stopTime = startTime + 
HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncSlaveTimeout();
+
+                        while (!transferOK && System.currentTimeMillis() < 
stopTime) {
                             this.notifyTransferObject.waitForRunning(1000);
                             transferOK = 
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                         }
 
                         if (!transferOK) {
-                            log.warn("transfer messsage to slave timeout, " + 
req.getNextOffset());
+                            log.warn("transfer message to slave timeout, " + 
req.getNextOffset());
                         }
 
                         req.wakeupCustomer(transferOK);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to