This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch fix_invalid_log
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 0decbec755f442bcfd324bea6f4ad67130f04c51
Author: RongtongJin <[email protected]>
AuthorDate: Wed Aug 31 10:38:08 2022 +0800

    Fix invalid log output after the master and slave broker are started on the 
empty disk in controller mode
---
 .../java/org/apache/rocketmq/broker/failover/EscapeBridge.java    | 8 ++------
 .../rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java      | 7 +++++++
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java 
b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
index 60860db74..25d449170 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
@@ -171,12 +171,8 @@ public class EscapeBridge {
                     producerGroup, SEND_TIMEOUT);
 
                 return future.exceptionally(throwable -> null)
-                    .thenApplyAsync(sendResult -> {
-                        return transformSendResult2PutResult(sendResult);
-                    }, this.defaultAsyncSenderExecutor)
-                    .exceptionally(throwable -> {
-                        return transformSendResult2PutResult(null);
-                    });
+                    .thenApplyAsync(sendResult -> 
transformSendResult2PutResult(sendResult), this.defaultAsyncSenderExecutor)
+                    .exceptionally(throwable -> 
transformSendResult2PutResult(null));
 
             } catch (Exception e) {
                 LOG.error("sendMessageInFailover to remote failed", e);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 63e4b6b9e..7b018ea7f 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -661,6 +661,13 @@ public class AutoSwitchHAConnection implements 
HAConnection {
                                 } else {
                                     this.nextTransferFromWhere = 
slaveRequestOffset;
                                 }
+
+                                // nextTransferFromWhere is not found. It may 
be empty disk and no message is entered
+                                if (this.nextTransferFromWhere == -1) {
+                                    sendHeartbeatIfNeeded();
+                                    waitForRunning(500);
+                                    break;
+                                }
                                 // Setup initial transferEpoch
                                 EpochEntry epochEntry = 
AutoSwitchHAConnection.this.epochCache.findEpochEntryByOffset(this.nextTransferFromWhere);
                                 if (epochEntry == null) {

Reply via email to