This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller 
by this push:
     new 81dc9386a Make broker do not online before get brokerId and 
master-slave relationship (#4433)
81dc9386a is described below

commit 81dc9386a1a5f932896bea3ddfa9452cddea1786
Author: rongtong <[email protected]>
AuthorDate: Thu Jun 9 11:48:49 2022 +0800

    Make broker do not online before get brokerId and master-slave relationship 
(#4433)
    
    * Make broker do not online before get brokerId and master-slave 
relationship
    
    * Make broker do not online before get brokerId and master-slave 
relationship
    
    * Polish the set isolated code
    
    * Remove disableWrite operation in role change
    
    * Polish the config name
---
 .../apache/rocketmq/broker/BrokerController.java   | 10 +++--
 .../broker/hacontroller/ReplicasManager.java       | 26 ++++-------
 .../broker/plugin/AbstractPluginMessageStore.java  |  8 ----
 .../org/apache/rocketmq/common/BrokerConfig.java   | 30 ++++++-------
 .../java/org/apache/rocketmq/store/CommitLog.java  |  4 --
 .../apache/rocketmq/store/DefaultMessageStore.java | 24 +++-------
 .../org/apache/rocketmq/store/MessageStore.java    | 51 +++++++++++++---------
 .../ha/autoswitch/AutoSwitchHAConnection.java      |  4 +-
 .../test/autoswitchrole/AutoSwitchRoleBase.java    |  4 +-
 9 files changed, 72 insertions(+), 89 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 8f961e7c5..0eef0098b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -378,7 +378,7 @@ public class BrokerController {
 
         this.escapeBridge = new EscapeBridge(this);
 
-        if (!this.brokerConfig.isSkipPreOnline()) {
+        if (this.brokerConfig.isEnableSlaveActingMaster() && 
!this.brokerConfig.isSkipPreOnline()) {
             this.brokerPreOnlineService = new BrokerPreOnlineService(this);
         }
     }
@@ -1443,7 +1443,7 @@ public class BrokerController {
 
         this.shouldStartTime = System.currentTimeMillis() + 
messageStoreConfig.getDisappearTimeAfterStart();
 
-        if (messageStoreConfig.getTotalReplicas() > 1 && 
this.brokerConfig.isEnableSlaveActingMaster()) {
+        if ((messageStoreConfig.getTotalReplicas() > 1 && 
this.brokerConfig.isEnableSlaveActingMaster()) || 
this.brokerConfig.isEnableControllerMode()) {
             isIsolated = true;
         }
 
@@ -1453,7 +1453,7 @@ public class BrokerController {
 
         startBasicService();
 
-        if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() 
&& !this.messageStoreConfig.isDuplicationEnable() && 
!this.brokerConfig.isEnableControllerMode()) {
+        if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() 
&& !this.messageStoreConfig.isDuplicationEnable()) {
             changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == 
MixAll.MASTER_ID);
             this.registerBrokerAll(true, false, true);
         }
@@ -2179,6 +2179,10 @@ public class BrokerController {
         return replicasManager;
     }
 
+    public void setIsolated(boolean isolated) {
+        isIsolated = isolated;
+    }
+
     public boolean isIsolated() {
         return this.isIsolated;
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index 40ac5d1e2..88a93abd9 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -162,8 +162,6 @@ public class ReplicasManager {
             if (newMasterEpoch > this.masterEpoch) {
                 LOGGER.info("Begin to change to master, brokerName:{}, 
replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), 
this.localAddress, newMasterEpoch);
 
-                brokerController.getMessageStore().disableWrite();
-
                 // Change record
                 this.masterAddress = this.localAddress;
                 this.masterEpoch = newMasterEpoch;
@@ -174,17 +172,15 @@ public class ReplicasManager {
                 changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
                 schedulingCheckSyncStateSet();
 
-                
this.brokerController.getBrokerConfig().setBrokerId(MixAll.MASTER_ID);
-                
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SYNC_MASTER);
-                this.brokerController.changeSpecialServiceStatus(true);
-
                 // Handle the slave synchronise
                 handleSlaveSynchronize(BrokerRole.SYNC_MASTER);
 
                 // Notify ha service, change to master
                 this.haService.changeToMaster(newMasterEpoch);
 
-                brokerController.getMessageStore().enableWrite();
+                
this.brokerController.getBrokerConfig().setBrokerId(MixAll.MASTER_ID);
+                
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SYNC_MASTER);
+                this.brokerController.changeSpecialServiceStatus(true);
 
                 this.executorService.submit(() -> {
                     // Register broker to name-srv
@@ -205,8 +201,6 @@ public class ReplicasManager {
             if (newMasterEpoch > this.masterEpoch) {
                 LOGGER.info("Begin to change to slave, brokerName={}, 
replicas={}, brokerId={}", this.brokerConfig.getBrokerName(), 
this.localAddress, brokerId);
 
-                brokerController.getMessageStore().disableWrite();
-
                 // Change record
                 this.masterAddress = newMasterAddress;
                 this.masterEpoch = newMasterEpoch;
@@ -224,8 +218,6 @@ public class ReplicasManager {
                 // Notify ha service, change to slave
                 this.haService.changeToSlave(newMasterAddress, newMasterEpoch, 
this.brokerConfig.getBrokerId());
 
-                brokerController.getMessageStore().enableWrite();
-
                 this.executorService.submit(() -> {
                     // Register broker to name-srv
                     try {
@@ -248,8 +240,6 @@ public class ReplicasManager {
                 this.syncStateSetEpoch = newSyncStateSetEpoch;
                 this.syncStateSet = new HashSet<>(newSyncStateSet);
                 this.haService.setSyncStateSet(newSyncStateSet);
-            } else {
-                LOGGER.info("Sync state set changed failed, 
newSyncStateSetEpoch is {} and syncStateSetEpoch is {}", newSyncStateSetEpoch, 
this.syncStateSetEpoch);
             }
         }
     }
@@ -286,6 +276,8 @@ public class ReplicasManager {
                 } else {
                     changeToSlave(newMasterAddress, 
registerResponse.getMasterEpoch(), registerResponse.getBrokerId());
                 }
+                // Set isolated to false, make broker can register to namesrv 
regularly
+                brokerController.setIsolated(false);
             }
             return true;
         } catch (final Exception e) {
@@ -329,7 +321,7 @@ public class ReplicasManager {
             } catch (final Exception e) {
                 LOGGER.warn("Error happen when get broker {}'s metadata", 
this.brokerConfig.getBrokerName(), e);
             }
-        }, 3 * 1000, 
this.brokerConfig.getReplicasManagerSyncBrokerMetadataPeriod(), 
TimeUnit.MILLISECONDS);
+        }, 3 * 1000, this.brokerConfig.getSyncBrokerMetadataPeriod(), 
TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -341,7 +333,7 @@ public class ReplicasManager {
         while (tryTimes < 3) {
             boolean flag = updateControllerMetadata();
             if (flag) {
-                
this.scheduledService.scheduleAtFixedRate(this::updateControllerMetadata, 1000 
* 3, this.brokerConfig.getReplicasManagerSyncControllerMetadataPeriod(), 
TimeUnit.MILLISECONDS);
+                
this.scheduledService.scheduleAtFixedRate(this::updateControllerMetadata, 1000 
* 3, this.brokerConfig.getSyncControllerMetadataPeriod(), 
TimeUnit.MILLISECONDS);
                 return true;
             }
             tryTimes++;
@@ -359,7 +351,7 @@ public class ReplicasManager {
                 final GetMetaDataResponseHeader responseHeader = 
this.brokerOuterAPI.getControllerMetaData(address);
                 if (responseHeader != null && 
StringUtils.isNoneEmpty(responseHeader.getControllerLeaderAddress())) {
                     this.controllerLeaderAddress = 
responseHeader.getControllerLeaderAddress();
-                    LOGGER.info("Change controller leader address to {}", 
this.controllerLeaderAddress);
+                    LOGGER.info("Update controller leader address to {}", 
this.controllerLeaderAddress);
                     return true;
                 }
             } catch (final Exception e) {
@@ -388,7 +380,7 @@ public class ReplicasManager {
                 }
             }
             doReportSyncStateSetChanged(newSyncStateSet);
-        }, 3 * 1000, 
this.brokerConfig.getReplicasManagerCheckSyncStateSetPeriod(), 
TimeUnit.MILLISECONDS);
+        }, 3 * 1000, this.brokerConfig.getCheckSyncStateSetPeriod(), 
TimeUnit.MILLISECONDS);
     }
 
     private void doReportSyncStateSetChanged(Set<String> newSyncStateSet) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 74680979c..42542210e 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -531,12 +531,4 @@ public abstract class AbstractPluginMessageStore 
implements MessageStore {
     @Override public boolean isShutdown() {
         return next.isShutdown();
     }
-
-    @Override public void disableWrite() {
-        next.disableWrite();
-    }
-
-    @Override public void enableWrite() {
-        next.enableWrite();
-    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 5bf85f86b..1b52d7713 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -307,11 +307,11 @@ public class BrokerConfig extends BrokerIdentity {
 
     private String controllerAddr = "";
 
-    private long replicasManagerSyncBrokerMetadataPeriod = 5 * 1000;
+    private long syncBrokerMetadataPeriod = 5 * 1000;
 
-    private long replicasManagerCheckSyncStateSetPeriod = 5 * 1000;
+    private long checkSyncStateSetPeriod = 5 * 1000;
 
-    private long replicasManagerSyncControllerMetadataPeriod = 10 * 1000;
+    private long syncControllerMetadataPeriod = 10 * 1000;
 
     public long getMaxPopPollingSize() {
         return maxPopPollingSize;
@@ -1313,27 +1313,27 @@ public class BrokerConfig extends BrokerIdentity {
         this.controllerAddr = controllerAddr;
     }
 
-    public long getReplicasManagerSyncBrokerMetadataPeriod() {
-        return replicasManagerSyncBrokerMetadataPeriod;
+    public long getSyncBrokerMetadataPeriod() {
+        return syncBrokerMetadataPeriod;
     }
 
-    public void setReplicasManagerSyncBrokerMetadataPeriod(long 
replicasManagerSyncBrokerMetadataPeriod) {
-        this.replicasManagerSyncBrokerMetadataPeriod = 
replicasManagerSyncBrokerMetadataPeriod;
+    public void setSyncBrokerMetadataPeriod(long syncBrokerMetadataPeriod) {
+        this.syncBrokerMetadataPeriod = syncBrokerMetadataPeriod;
     }
 
-    public long getReplicasManagerCheckSyncStateSetPeriod() {
-        return replicasManagerCheckSyncStateSetPeriod;
+    public long getCheckSyncStateSetPeriod() {
+        return checkSyncStateSetPeriod;
     }
 
-    public void setReplicasManagerCheckSyncStateSetPeriod(long 
replicasManagerCheckSyncStateSetPeriod) {
-        this.replicasManagerCheckSyncStateSetPeriod = 
replicasManagerCheckSyncStateSetPeriod;
+    public void setCheckSyncStateSetPeriod(long checkSyncStateSetPeriod) {
+        this.checkSyncStateSetPeriod = checkSyncStateSetPeriod;
     }
 
-    public long getReplicasManagerSyncControllerMetadataPeriod() {
-        return replicasManagerSyncControllerMetadataPeriod;
+    public long getSyncControllerMetadataPeriod() {
+        return syncControllerMetadataPeriod;
     }
 
-    public void setReplicasManagerSyncControllerMetadataPeriod(long 
replicasManagerSyncControllerMetadataPeriod) {
-        this.replicasManagerSyncControllerMetadataPeriod = 
replicasManagerSyncControllerMetadataPeriod;
+    public void setSyncControllerMetadataPeriod(long 
syncControllerMetadataPeriod) {
+        this.syncControllerMetadataPeriod = syncControllerMetadataPeriod;
     }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 98b4fc937..b95804bb3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -139,8 +139,6 @@ public class CommitLog implements Swappable {
     }
 
     public void start() {
-        this.flushManager.start();
-        log.info("start commitLog successfully. storeRoot: {}", 
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
         this.flushManager.start();
         log.info("start commitLog successfully. storeRoot: {}", 
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
         flushDiskWatcher.setDaemon(true);
@@ -148,8 +146,6 @@ public class CommitLog implements Swappable {
     }
 
     public void shutdown() {
-        this.flushManager.shutdown();
-        log.info("shutdown commitLog successfully. storeRoot: {}", 
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
         this.flushManager.shutdown();
         log.info("shutdown commitLog successfully. storeRoot: {}", 
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
         flushDiskWatcher.shutdown(true);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index b08bc7574..7c57cf11f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -479,7 +479,7 @@ public class DefaultMessageStore implements MessageStore {
         }
 
         if (msg.getProperties().containsKey(MessageConst.PROPERTY_INNER_NUM)
-                && !MessageSysFlag.check(msg.getSysFlag(), 
MessageSysFlag.INNER_BATCH_FLAG)) {
+            && !MessageSysFlag.check(msg.getSysFlag(), 
MessageSysFlag.INNER_BATCH_FLAG)) {
             LOGGER.warn("[BUG]The message had property {} but is not an inner 
batch", MessageConst.PROPERTY_INNER_NUM);
             return CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
         }
@@ -499,7 +499,7 @@ public class DefaultMessageStore implements MessageStore {
             long elapsedTime = this.getSystemClock().now() - beginTime;
             if (elapsedTime > 500) {
                 LOGGER.warn("DefaultMessageStore#putMessage: 
CommitLog#putMessage cost {}ms, topic={}, bodyLength={}",
-                    msg.getTopic(), msg.getBody().length);
+                    elapsedTime, msg.getTopic(), msg.getBody().length);
             }
             this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
 
@@ -552,15 +552,15 @@ public class DefaultMessageStore implements MessageStore {
     private PutMessageResult 
waitForPutResult(CompletableFuture<PutMessageResult> putMessageResultFuture) {
         try {
             int putMessageTimeout =
-                    Math.max(this.messageStoreConfig.getSyncFlushTimeout(),
-                            this.messageStoreConfig.getSlaveTimeout()) + 5000;
+                Math.max(this.messageStoreConfig.getSyncFlushTimeout(),
+                    this.messageStoreConfig.getSlaveTimeout()) + 5000;
             return putMessageResultFuture.get(putMessageTimeout, 
TimeUnit.MILLISECONDS);
         } catch (ExecutionException | InterruptedException e) {
             return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
         } catch (TimeoutException e) {
             LOGGER.error("usually it will never timeout, putMessageTimeout is 
much bigger than slaveTimeout and "
-                    + "flushTimeout so the result can be got anyway, but in 
some situations timeout will happen like full gc "
-                    + "process hangs or other unexpected situations.");
+                + "flushTimeout so the result can be got anyway, but in some 
situations timeout will happen like full gc "
+                + "process hangs or other unexpected situations.");
             return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
         }
     }
@@ -1282,7 +1282,7 @@ public class DefaultMessageStore implements MessageStore {
                             int msgIdLength = (inetSocketAddress.getAddress() 
instanceof Inet6Address) ? 16 + 4 + 8 : 4 + 4 + 8;
                             final ByteBuffer msgIdMemory = 
ByteBuffer.allocate(msgIdLength);
                             String msgId =
-                                    
MessageDecoder.createMessageId(msgIdMemory, 
MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy);
+                                MessageDecoder.createMessageId(msgIdMemory, 
MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy);
                             messageIds.put(msgId, cqUnit.getQueueOffset());
                             nextOffset = cqUnit.getQueueOffset() + 
cqUnit.getBatchNum();
                             if (nextOffset >= maxOffset) {
@@ -1675,16 +1675,6 @@ public class DefaultMessageStore implements MessageStore 
{
         return runningFlags;
     }
 
-    @Override
-    public void disableWrite() {
-        runningFlags.getAndMakeNotWriteable();
-    }
-
-    @Override
-    public void enableWrite() {
-        runningFlags.getAndMakeWriteable();
-    }
-
     public void doDispatch(DispatchRequest req) {
         for (CommitLogDispatcher dispatcher : this.dispatcherList) {
             dispatcher.dispatch(req);
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index b0ca83979..755c89b7b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -69,9 +69,9 @@ public interface MessageStore {
      */
     void destroy();
 
-    /** Store a message into store in async manner, the processor can process 
the next request
-     *  rather than wait for result
-     *  when result is completed, notify the client in async manner
+    /**
+     * Store a message into store in async manner, the processor can process 
the next request rather than wait for
+     * result when result is completed, notify the client in async manner
      *
      * @param msg MessageInstance to store
      * @return a CompletableFuture for the result of store operation
@@ -82,6 +82,7 @@ public interface MessageStore {
 
     /**
      * Store a batch of messages in async manner
+     *
      * @param messageExtBatch the message batch
      * @return a CompletableFuture for the result of store operation
      */
@@ -150,8 +151,7 @@ public interface MessageStore {
      *
      * @param topic Topic name.
      * @param queueId Queue ID.
-     * @param committed return the max offset in ConsumeQueue if true,
-     *                  or the max offset in CommitLog if false
+     * @param committed return the max offset in ConsumeQueue if true, or the 
max offset in CommitLog if false
      * @return Maximum offset at present.
      */
     long getMaxOffsetInQueue(final String topic, final int queueId, final 
boolean committed);
@@ -235,6 +235,7 @@ public interface MessageStore {
 
     /**
      * HA runtime information
+     *
      * @return runtime information of ha
      */
     HARuntimeInfo getHARuntimeInfo();
@@ -305,7 +306,6 @@ public interface MessageStore {
      */
     List<SelectMappedBufferResult> getBulkCommitLogData(final long offset, 
final int size);
 
-
     /**
      * Append data to commit log.
      *
@@ -341,7 +341,6 @@ public interface MessageStore {
      */
     void updateHaMasterAddress(final String newAddr);
 
-
     /**
      * Update master address.
      *
@@ -466,7 +465,6 @@ public interface MessageStore {
      */
     ConsumeQueueInterface getConsumeQueue(String topic, int queueId);
 
-
     /**
      * Get BrokerStatsManager of the messageStore.
      *
@@ -476,6 +474,7 @@ public interface MessageStore {
 
     /**
      * Will be triggered when a new message is appended to commit log.
+     *
      * @param msg the msg that is appended to commit log
      * @param result append message result
      * @param commitLogFile commit log file
@@ -484,70 +483,82 @@ public interface MessageStore {
 
     /**
      * Will be triggered when a new dispatch request is sent to message store.
+     *
      * @param dispatchRequest dispatch request
      * @param doDispatch do dispatch if true
      * @param commitLogFile commit log file
      * @param isRecover is from recover process
      * @param isFileEnd if the dispatch request represents 'file end'
      */
-    void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean 
doDispatch, MappedFile commitLogFile, boolean isRecover, boolean isFileEnd);
+    void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean 
doDispatch, MappedFile commitLogFile,
+        boolean isRecover, boolean isFileEnd);
 
     /**
      * Get the message store config
+     *
      * @return the message store config
      */
     MessageStoreConfig getMessageStoreConfig();
 
     /**
      * Get the statistics service
+     *
      * @return the statistics service
      */
     StoreStatsService getStoreStatsService();
 
     /**
      * Get the store checkpoint component
+     *
      * @return the checkpoint component
      */
     StoreCheckpoint getStoreCheckpoint();
 
     /**
      * Get the system clock
+     *
      * @return the system clock
      */
     SystemClock getSystemClock();
 
     /**
      * Get the commit log
+     *
      * @return the commit log
      */
     CommitLog getCommitLog();
 
     /**
      * Get running flags
+     *
      * @return running flags
      */
     RunningFlags getRunningFlags();
 
     /**
      * Get the transient store pool
+     *
      * @return the transient store pool
      */
     TransientStorePool getTransientStorePool();
 
     /**
      * Get the HA service
+     *
      * @return the HA service
      */
     HAService getHaService();
 
     /**
      * Get the allocate-mappedFile service
+     *
      * @return the allocate-mappedFile service
      */
     AllocateMappedFileService getAllocateMappedFileService();
 
     /**
      * Truncate dirty logic files
+     *
      * @param phyOffset physical offset
      */
     void truncateDirtyLogicFiles(long phyOffset);
@@ -559,37 +570,42 @@ public interface MessageStore {
 
     /**
      * Unlock mappedFile
+     *
      * @param unlockMappedFile the file that needs to be unlocked
      */
     void unlockMappedFile(MappedFile unlockMappedFile);
 
     /**
      * Get the perf counter component
+     *
      * @return the perf counter component
      */
     PerfCounter.Ticks getPerfCounter();
 
     /**
      * Get the queue store
+     *
      * @return the queue store
      */
     ConsumeQueueStore getQueueStore();
 
     /**
      * If 'sync disk flush' is configured in this message store
+     *
      * @return yes if true, no if false
      */
     boolean isSyncDiskFlush();
 
     /**
      * If this message store is sync master role
+     *
      * @return yes if true, no if false
      */
     boolean isSyncMaster();
 
     /**
-     * Assign an queue offset and increase it.
-     * If there is a race condition, you need to lock/unlock this method 
yourself.
+     * Assign an queue offset and increase it. If there is a race condition, 
you need to lock/unlock this method
+     * yourself.
      *
      * @param msg message
      * @param messageNum message num
@@ -598,6 +614,7 @@ public interface MessageStore {
 
     /**
      * get topic config
+     *
      * @param topic topic name
      * @return topic config info
      */
@@ -619,6 +636,7 @@ public interface MessageStore {
 
     /**
      * Use FileChannel to get data
+     *
      * @param offset
      * @param size
      * @param byteBuffer
@@ -645,7 +663,6 @@ public interface MessageStore {
      */
     void wakeupHAClient();
 
-
     /**
      * Get master flushed offset.
      *
@@ -731,6 +748,7 @@ public interface MessageStore {
 
     /**
      * Get last mapped file
+     *
      * @param startOffset
      * @return true when get the last mapped file, false when get null
      */
@@ -797,13 +815,4 @@ public interface MessageStore {
      */
     boolean isShutdown();
 
-    /*
-     * Make MessageStore not writeable, default is writeable
-     */
-    void disableWrite();
-
-    /*
-     * Make MessageStore not writeable, default is writeable
-     */
-    void enableWrite();
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 8ee6c985e..7d689b69a 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -323,7 +323,7 @@ public class AutoSwitchHAConnection implements HAConnection 
{
                                 byteBufferRead.position(readSocketPos);
                                 maybeExpandInSyncStateSet(slaveMaxOffset);
                                 
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
-                                LOGGER.info("slave[" + clientAddress + "] 
request offset " + slaveMaxOffset);
+                                LOGGER.debug("slave[" + clientAddress + "] 
request offset " + slaveMaxOffset);
                                 break;
                             default:
                                 LOGGER.error("Current state illegal {}", 
currentState);
@@ -526,7 +526,7 @@ public class AutoSwitchHAConnection implements HAConnection 
{
             final long confirmOffset = 
AutoSwitchHAConnection.this.haService.getConfirmOffset();
             this.byteBufferHeader.putLong(confirmOffset);
             this.byteBufferHeader.flip();
-            LOGGER.info("Master send msg, state:{}, size:{}, offset:{}, 
epoch:{}, epochStartOffset:{}, confirmOffset:{}",
+            LOGGER.debug("Master send msg, state:{}, size:{}, offset:{}, 
epoch:{}, epochStartOffset:{}, confirmOffset:{}",
                 currentState, bodySize, nextOffset, entry.getEpoch(), 
entry.getStartOffset(), confirmOffset);
         }
 
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
 
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index f085488b2..849788320 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -81,8 +81,8 @@ public class AutoSwitchRoleBase {
         brokerConfig.setListenPort(brokerListenPort);
         brokerConfig.setNamesrvAddr(namesrvAddress);
         brokerConfig.setControllerAddr(controllerAddress);
-        brokerConfig.setReplicasManagerSyncBrokerMetadataPeriod(2 * 1000);
-        brokerConfig.setReplicasManagerCheckSyncStateSetPeriod(2 * 1000);
+        brokerConfig.setSyncBrokerMetadataPeriod(2 * 1000);
+        brokerConfig.setCheckSyncStateSetPeriod(2 * 1000);
         brokerConfig.setEnableControllerMode(true);
 
         final NettyServerConfig nettyServerConfig = new NettyServerConfig();

Reply via email to