This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch fix_invalid_log in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 0decbec755f442bcfd324bea6f4ad67130f04c51 Author: RongtongJin <[email protected]> AuthorDate: Wed Aug 31 10:38:08 2022 +0800 Fix invalid log output after the master and slave broker are started on the empty disk in controller mode --- .../java/org/apache/rocketmq/broker/failover/EscapeBridge.java | 8 ++------ .../rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java | 7 +++++++ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java index 60860db74..25d449170 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java @@ -171,12 +171,8 @@ public class EscapeBridge { producerGroup, SEND_TIMEOUT); return future.exceptionally(throwable -> null) - .thenApplyAsync(sendResult -> { - return transformSendResult2PutResult(sendResult); - }, this.defaultAsyncSenderExecutor) - .exceptionally(throwable -> { - return transformSendResult2PutResult(null); - }); + .thenApplyAsync(sendResult -> transformSendResult2PutResult(sendResult), this.defaultAsyncSenderExecutor) + .exceptionally(throwable -> transformSendResult2PutResult(null)); } catch (Exception e) { LOG.error("sendMessageInFailover to remote failed", e); diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java index 63e4b6b9e..7b018ea7f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java @@ -661,6 +661,13 @@ public class AutoSwitchHAConnection implements HAConnection { } else { this.nextTransferFromWhere = slaveRequestOffset; } + + // nextTransferFromWhere is not found. It may be empty disk and no message is entered + if (this.nextTransferFromWhere == -1) { + sendHeartbeatIfNeeded(); + waitForRunning(500); + break; + } // Setup initial transferEpoch EpochEntry epochEntry = AutoSwitchHAConnection.this.epochCache.findEpochEntryByOffset(this.nextTransferFromWhere); if (epochEntry == null) {
