This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller
by this push:
new 81dc9386a Make broker do not online before get brokerId and
master-slave relationship (#4433)
81dc9386a is described below
commit 81dc9386a1a5f932896bea3ddfa9452cddea1786
Author: rongtong <[email protected]>
AuthorDate: Thu Jun 9 11:48:49 2022 +0800
Make broker do not online before get brokerId and master-slave relationship
(#4433)
* Make broker do not online before get brokerId and master-slave
relationship
* Make broker do not online before get brokerId and master-slave
relationship
* Polish the set isolated code
* Remove disableWrite operation in role change
* Polish the config name
---
.../apache/rocketmq/broker/BrokerController.java | 10 +++--
.../broker/hacontroller/ReplicasManager.java | 26 ++++-------
.../broker/plugin/AbstractPluginMessageStore.java | 8 ----
.../org/apache/rocketmq/common/BrokerConfig.java | 30 ++++++-------
.../java/org/apache/rocketmq/store/CommitLog.java | 4 --
.../apache/rocketmq/store/DefaultMessageStore.java | 24 +++-------
.../org/apache/rocketmq/store/MessageStore.java | 51 +++++++++++++---------
.../ha/autoswitch/AutoSwitchHAConnection.java | 4 +-
.../test/autoswitchrole/AutoSwitchRoleBase.java | 4 +-
9 files changed, 72 insertions(+), 89 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 8f961e7c5..0eef0098b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -378,7 +378,7 @@ public class BrokerController {
this.escapeBridge = new EscapeBridge(this);
- if (!this.brokerConfig.isSkipPreOnline()) {
+ if (this.brokerConfig.isEnableSlaveActingMaster() &&
!this.brokerConfig.isSkipPreOnline()) {
this.brokerPreOnlineService = new BrokerPreOnlineService(this);
}
}
@@ -1443,7 +1443,7 @@ public class BrokerController {
this.shouldStartTime = System.currentTimeMillis() +
messageStoreConfig.getDisappearTimeAfterStart();
- if (messageStoreConfig.getTotalReplicas() > 1 &&
this.brokerConfig.isEnableSlaveActingMaster()) {
+ if ((messageStoreConfig.getTotalReplicas() > 1 &&
this.brokerConfig.isEnableSlaveActingMaster()) ||
this.brokerConfig.isEnableControllerMode()) {
isIsolated = true;
}
@@ -1453,7 +1453,7 @@ public class BrokerController {
startBasicService();
- if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog()
&& !this.messageStoreConfig.isDuplicationEnable() &&
!this.brokerConfig.isEnableControllerMode()) {
+ if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog()
&& !this.messageStoreConfig.isDuplicationEnable()) {
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() ==
MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
}
@@ -2179,6 +2179,10 @@ public class BrokerController {
return replicasManager;
}
+ public void setIsolated(boolean isolated) {
+ isIsolated = isolated;
+ }
+
public boolean isIsolated() {
return this.isIsolated;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index 40ac5d1e2..88a93abd9 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -162,8 +162,6 @@ public class ReplicasManager {
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to master, brokerName:{},
replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(),
this.localAddress, newMasterEpoch);
- brokerController.getMessageStore().disableWrite();
-
// Change record
this.masterAddress = this.localAddress;
this.masterEpoch = newMasterEpoch;
@@ -174,17 +172,15 @@ public class ReplicasManager {
changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
schedulingCheckSyncStateSet();
-
this.brokerController.getBrokerConfig().setBrokerId(MixAll.MASTER_ID);
-
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SYNC_MASTER);
- this.brokerController.changeSpecialServiceStatus(true);
-
// Handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SYNC_MASTER);
// Notify ha service, change to master
this.haService.changeToMaster(newMasterEpoch);
- brokerController.getMessageStore().enableWrite();
+
this.brokerController.getBrokerConfig().setBrokerId(MixAll.MASTER_ID);
+
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SYNC_MASTER);
+ this.brokerController.changeSpecialServiceStatus(true);
this.executorService.submit(() -> {
// Register broker to name-srv
@@ -205,8 +201,6 @@ public class ReplicasManager {
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to slave, brokerName={},
replicas={}, brokerId={}", this.brokerConfig.getBrokerName(),
this.localAddress, brokerId);
- brokerController.getMessageStore().disableWrite();
-
// Change record
this.masterAddress = newMasterAddress;
this.masterEpoch = newMasterEpoch;
@@ -224,8 +218,6 @@ public class ReplicasManager {
// Notify ha service, change to slave
this.haService.changeToSlave(newMasterAddress, newMasterEpoch,
this.brokerConfig.getBrokerId());
- brokerController.getMessageStore().enableWrite();
-
this.executorService.submit(() -> {
// Register broker to name-srv
try {
@@ -248,8 +240,6 @@ public class ReplicasManager {
this.syncStateSetEpoch = newSyncStateSetEpoch;
this.syncStateSet = new HashSet<>(newSyncStateSet);
this.haService.setSyncStateSet(newSyncStateSet);
- } else {
- LOGGER.info("Sync state set changed failed,
newSyncStateSetEpoch is {} and syncStateSetEpoch is {}", newSyncStateSetEpoch,
this.syncStateSetEpoch);
}
}
}
@@ -286,6 +276,8 @@ public class ReplicasManager {
} else {
changeToSlave(newMasterAddress,
registerResponse.getMasterEpoch(), registerResponse.getBrokerId());
}
+ // Set isolated to false, make broker can register to namesrv
regularly
+ brokerController.setIsolated(false);
}
return true;
} catch (final Exception e) {
@@ -329,7 +321,7 @@ public class ReplicasManager {
} catch (final Exception e) {
LOGGER.warn("Error happen when get broker {}'s metadata",
this.brokerConfig.getBrokerName(), e);
}
- }, 3 * 1000,
this.brokerConfig.getReplicasManagerSyncBrokerMetadataPeriod(),
TimeUnit.MILLISECONDS);
+ }, 3 * 1000, this.brokerConfig.getSyncBrokerMetadataPeriod(),
TimeUnit.MILLISECONDS);
}
/**
@@ -341,7 +333,7 @@ public class ReplicasManager {
while (tryTimes < 3) {
boolean flag = updateControllerMetadata();
if (flag) {
-
this.scheduledService.scheduleAtFixedRate(this::updateControllerMetadata, 1000
* 3, this.brokerConfig.getReplicasManagerSyncControllerMetadataPeriod(),
TimeUnit.MILLISECONDS);
+
this.scheduledService.scheduleAtFixedRate(this::updateControllerMetadata, 1000
* 3, this.brokerConfig.getSyncControllerMetadataPeriod(),
TimeUnit.MILLISECONDS);
return true;
}
tryTimes++;
@@ -359,7 +351,7 @@ public class ReplicasManager {
final GetMetaDataResponseHeader responseHeader =
this.brokerOuterAPI.getControllerMetaData(address);
if (responseHeader != null &&
StringUtils.isNoneEmpty(responseHeader.getControllerLeaderAddress())) {
this.controllerLeaderAddress =
responseHeader.getControllerLeaderAddress();
- LOGGER.info("Change controller leader address to {}",
this.controllerLeaderAddress);
+ LOGGER.info("Update controller leader address to {}",
this.controllerLeaderAddress);
return true;
}
} catch (final Exception e) {
@@ -388,7 +380,7 @@ public class ReplicasManager {
}
}
doReportSyncStateSetChanged(newSyncStateSet);
- }, 3 * 1000,
this.brokerConfig.getReplicasManagerCheckSyncStateSetPeriod(),
TimeUnit.MILLISECONDS);
+ }, 3 * 1000, this.brokerConfig.getCheckSyncStateSetPeriod(),
TimeUnit.MILLISECONDS);
}
private void doReportSyncStateSetChanged(Set<String> newSyncStateSet) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 74680979c..42542210e 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -531,12 +531,4 @@ public abstract class AbstractPluginMessageStore
implements MessageStore {
@Override public boolean isShutdown() {
return next.isShutdown();
}
-
- @Override public void disableWrite() {
- next.disableWrite();
- }
-
- @Override public void enableWrite() {
- next.enableWrite();
- }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 5bf85f86b..1b52d7713 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -307,11 +307,11 @@ public class BrokerConfig extends BrokerIdentity {
private String controllerAddr = "";
- private long replicasManagerSyncBrokerMetadataPeriod = 5 * 1000;
+ private long syncBrokerMetadataPeriod = 5 * 1000;
- private long replicasManagerCheckSyncStateSetPeriod = 5 * 1000;
+ private long checkSyncStateSetPeriod = 5 * 1000;
- private long replicasManagerSyncControllerMetadataPeriod = 10 * 1000;
+ private long syncControllerMetadataPeriod = 10 * 1000;
public long getMaxPopPollingSize() {
return maxPopPollingSize;
@@ -1313,27 +1313,27 @@ public class BrokerConfig extends BrokerIdentity {
this.controllerAddr = controllerAddr;
}
- public long getReplicasManagerSyncBrokerMetadataPeriod() {
- return replicasManagerSyncBrokerMetadataPeriod;
+ public long getSyncBrokerMetadataPeriod() {
+ return syncBrokerMetadataPeriod;
}
- public void setReplicasManagerSyncBrokerMetadataPeriod(long
replicasManagerSyncBrokerMetadataPeriod) {
- this.replicasManagerSyncBrokerMetadataPeriod =
replicasManagerSyncBrokerMetadataPeriod;
+ public void setSyncBrokerMetadataPeriod(long syncBrokerMetadataPeriod) {
+ this.syncBrokerMetadataPeriod = syncBrokerMetadataPeriod;
}
- public long getReplicasManagerCheckSyncStateSetPeriod() {
- return replicasManagerCheckSyncStateSetPeriod;
+ public long getCheckSyncStateSetPeriod() {
+ return checkSyncStateSetPeriod;
}
- public void setReplicasManagerCheckSyncStateSetPeriod(long
replicasManagerCheckSyncStateSetPeriod) {
- this.replicasManagerCheckSyncStateSetPeriod =
replicasManagerCheckSyncStateSetPeriod;
+ public void setCheckSyncStateSetPeriod(long checkSyncStateSetPeriod) {
+ this.checkSyncStateSetPeriod = checkSyncStateSetPeriod;
}
- public long getReplicasManagerSyncControllerMetadataPeriod() {
- return replicasManagerSyncControllerMetadataPeriod;
+ public long getSyncControllerMetadataPeriod() {
+ return syncControllerMetadataPeriod;
}
- public void setReplicasManagerSyncControllerMetadataPeriod(long
replicasManagerSyncControllerMetadataPeriod) {
- this.replicasManagerSyncControllerMetadataPeriod =
replicasManagerSyncControllerMetadataPeriod;
+ public void setSyncControllerMetadataPeriod(long
syncControllerMetadataPeriod) {
+ this.syncControllerMetadataPeriod = syncControllerMetadataPeriod;
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 98b4fc937..b95804bb3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -139,8 +139,6 @@ public class CommitLog implements Swappable {
}
public void start() {
- this.flushManager.start();
- log.info("start commitLog successfully. storeRoot: {}",
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
this.flushManager.start();
log.info("start commitLog successfully. storeRoot: {}",
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
flushDiskWatcher.setDaemon(true);
@@ -148,8 +146,6 @@ public class CommitLog implements Swappable {
}
public void shutdown() {
- this.flushManager.shutdown();
- log.info("shutdown commitLog successfully. storeRoot: {}",
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
this.flushManager.shutdown();
log.info("shutdown commitLog successfully. storeRoot: {}",
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
flushDiskWatcher.shutdown(true);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index b08bc7574..7c57cf11f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -479,7 +479,7 @@ public class DefaultMessageStore implements MessageStore {
}
if (msg.getProperties().containsKey(MessageConst.PROPERTY_INNER_NUM)
- && !MessageSysFlag.check(msg.getSysFlag(),
MessageSysFlag.INNER_BATCH_FLAG)) {
+ && !MessageSysFlag.check(msg.getSysFlag(),
MessageSysFlag.INNER_BATCH_FLAG)) {
LOGGER.warn("[BUG]The message had property {} but is not an inner
batch", MessageConst.PROPERTY_INNER_NUM);
return CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
@@ -499,7 +499,7 @@ public class DefaultMessageStore implements MessageStore {
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
LOGGER.warn("DefaultMessageStore#putMessage:
CommitLog#putMessage cost {}ms, topic={}, bodyLength={}",
- msg.getTopic(), msg.getBody().length);
+ elapsedTime, msg.getTopic(), msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
@@ -552,15 +552,15 @@ public class DefaultMessageStore implements MessageStore {
private PutMessageResult
waitForPutResult(CompletableFuture<PutMessageResult> putMessageResultFuture) {
try {
int putMessageTimeout =
- Math.max(this.messageStoreConfig.getSyncFlushTimeout(),
- this.messageStoreConfig.getSlaveTimeout()) + 5000;
+ Math.max(this.messageStoreConfig.getSyncFlushTimeout(),
+ this.messageStoreConfig.getSlaveTimeout()) + 5000;
return putMessageResultFuture.get(putMessageTimeout,
TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException e) {
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
} catch (TimeoutException e) {
LOGGER.error("usually it will never timeout, putMessageTimeout is
much bigger than slaveTimeout and "
- + "flushTimeout so the result can be got anyway, but in
some situations timeout will happen like full gc "
- + "process hangs or other unexpected situations.");
+ + "flushTimeout so the result can be got anyway, but in some
situations timeout will happen like full gc "
+ + "process hangs or other unexpected situations.");
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
}
}
@@ -1282,7 +1282,7 @@ public class DefaultMessageStore implements MessageStore {
int msgIdLength = (inetSocketAddress.getAddress()
instanceof Inet6Address) ? 16 + 4 + 8 : 4 + 4 + 8;
final ByteBuffer msgIdMemory =
ByteBuffer.allocate(msgIdLength);
String msgId =
-
MessageDecoder.createMessageId(msgIdMemory,
MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy);
+ MessageDecoder.createMessageId(msgIdMemory,
MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy);
messageIds.put(msgId, cqUnit.getQueueOffset());
nextOffset = cqUnit.getQueueOffset() +
cqUnit.getBatchNum();
if (nextOffset >= maxOffset) {
@@ -1675,16 +1675,6 @@ public class DefaultMessageStore implements MessageStore
{
return runningFlags;
}
- @Override
- public void disableWrite() {
- runningFlags.getAndMakeNotWriteable();
- }
-
- @Override
- public void enableWrite() {
- runningFlags.getAndMakeWriteable();
- }
-
public void doDispatch(DispatchRequest req) {
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index b0ca83979..755c89b7b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -69,9 +69,9 @@ public interface MessageStore {
*/
void destroy();
- /** Store a message into store in async manner, the processor can process
the next request
- * rather than wait for result
- * when result is completed, notify the client in async manner
+ /**
+ * Store a message into store in async manner, the processor can process
the next request rather than wait for
+ * result when result is completed, notify the client in async manner
*
* @param msg MessageInstance to store
* @return a CompletableFuture for the result of store operation
@@ -82,6 +82,7 @@ public interface MessageStore {
/**
* Store a batch of messages in async manner
+ *
* @param messageExtBatch the message batch
* @return a CompletableFuture for the result of store operation
*/
@@ -150,8 +151,7 @@ public interface MessageStore {
*
* @param topic Topic name.
* @param queueId Queue ID.
- * @param committed return the max offset in ConsumeQueue if true,
- * or the max offset in CommitLog if false
+ * @param committed return the max offset in ConsumeQueue if true, or the
max offset in CommitLog if false
* @return Maximum offset at present.
*/
long getMaxOffsetInQueue(final String topic, final int queueId, final
boolean committed);
@@ -235,6 +235,7 @@ public interface MessageStore {
/**
* HA runtime information
+ *
* @return runtime information of ha
*/
HARuntimeInfo getHARuntimeInfo();
@@ -305,7 +306,6 @@ public interface MessageStore {
*/
List<SelectMappedBufferResult> getBulkCommitLogData(final long offset,
final int size);
-
/**
* Append data to commit log.
*
@@ -341,7 +341,6 @@ public interface MessageStore {
*/
void updateHaMasterAddress(final String newAddr);
-
/**
* Update master address.
*
@@ -466,7 +465,6 @@ public interface MessageStore {
*/
ConsumeQueueInterface getConsumeQueue(String topic, int queueId);
-
/**
* Get BrokerStatsManager of the messageStore.
*
@@ -476,6 +474,7 @@ public interface MessageStore {
/**
* Will be triggered when a new message is appended to commit log.
+ *
* @param msg the msg that is appended to commit log
* @param result append message result
* @param commitLogFile commit log file
@@ -484,70 +483,82 @@ public interface MessageStore {
/**
* Will be triggered when a new dispatch request is sent to message store.
+ *
* @param dispatchRequest dispatch request
* @param doDispatch do dispatch if true
* @param commitLogFile commit log file
* @param isRecover is from recover process
* @param isFileEnd if the dispatch request represents 'file end'
*/
- void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean
doDispatch, MappedFile commitLogFile, boolean isRecover, boolean isFileEnd);
+ void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean
doDispatch, MappedFile commitLogFile,
+ boolean isRecover, boolean isFileEnd);
/**
* Get the message store config
+ *
* @return the message store config
*/
MessageStoreConfig getMessageStoreConfig();
/**
* Get the statistics service
+ *
* @return the statistics service
*/
StoreStatsService getStoreStatsService();
/**
* Get the store checkpoint component
+ *
* @return the checkpoint component
*/
StoreCheckpoint getStoreCheckpoint();
/**
* Get the system clock
+ *
* @return the system clock
*/
SystemClock getSystemClock();
/**
* Get the commit log
+ *
* @return the commit log
*/
CommitLog getCommitLog();
/**
* Get running flags
+ *
* @return running flags
*/
RunningFlags getRunningFlags();
/**
* Get the transient store pool
+ *
* @return the transient store pool
*/
TransientStorePool getTransientStorePool();
/**
* Get the HA service
+ *
* @return the HA service
*/
HAService getHaService();
/**
* Get the allocate-mappedFile service
+ *
* @return the allocate-mappedFile service
*/
AllocateMappedFileService getAllocateMappedFileService();
/**
* Truncate dirty logic files
+ *
* @param phyOffset physical offset
*/
void truncateDirtyLogicFiles(long phyOffset);
@@ -559,37 +570,42 @@ public interface MessageStore {
/**
* Unlock mappedFile
+ *
* @param unlockMappedFile the file that needs to be unlocked
*/
void unlockMappedFile(MappedFile unlockMappedFile);
/**
* Get the perf counter component
+ *
* @return the perf counter component
*/
PerfCounter.Ticks getPerfCounter();
/**
* Get the queue store
+ *
* @return the queue store
*/
ConsumeQueueStore getQueueStore();
/**
* If 'sync disk flush' is configured in this message store
+ *
* @return yes if true, no if false
*/
boolean isSyncDiskFlush();
/**
* If this message store is sync master role
+ *
* @return yes if true, no if false
*/
boolean isSyncMaster();
/**
- * Assign an queue offset and increase it.
- * If there is a race condition, you need to lock/unlock this method
yourself.
+ * Assign an queue offset and increase it. If there is a race condition,
you need to lock/unlock this method
+ * yourself.
*
* @param msg message
* @param messageNum message num
@@ -598,6 +614,7 @@ public interface MessageStore {
/**
* get topic config
+ *
* @param topic topic name
* @return topic config info
*/
@@ -619,6 +636,7 @@ public interface MessageStore {
/**
* Use FileChannel to get data
+ *
* @param offset
* @param size
* @param byteBuffer
@@ -645,7 +663,6 @@ public interface MessageStore {
*/
void wakeupHAClient();
-
/**
* Get master flushed offset.
*
@@ -731,6 +748,7 @@ public interface MessageStore {
/**
* Get last mapped file
+ *
* @param startOffset
* @return true when get the last mapped file, false when get null
*/
@@ -797,13 +815,4 @@ public interface MessageStore {
*/
boolean isShutdown();
- /*
- * Make MessageStore not writeable, default is writeable
- */
- void disableWrite();
-
- /*
- * Make MessageStore not writeable, default is writeable
- */
- void enableWrite();
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 8ee6c985e..7d689b69a 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -323,7 +323,7 @@ public class AutoSwitchHAConnection implements HAConnection
{
byteBufferRead.position(readSocketPos);
maybeExpandInSyncStateSet(slaveMaxOffset);
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
- LOGGER.info("slave[" + clientAddress + "]
request offset " + slaveMaxOffset);
+ LOGGER.debug("slave[" + clientAddress + "]
request offset " + slaveMaxOffset);
break;
default:
LOGGER.error("Current state illegal {}",
currentState);
@@ -526,7 +526,7 @@ public class AutoSwitchHAConnection implements HAConnection
{
final long confirmOffset =
AutoSwitchHAConnection.this.haService.getConfirmOffset();
this.byteBufferHeader.putLong(confirmOffset);
this.byteBufferHeader.flip();
- LOGGER.info("Master send msg, state:{}, size:{}, offset:{},
epoch:{}, epochStartOffset:{}, confirmOffset:{}",
+ LOGGER.debug("Master send msg, state:{}, size:{}, offset:{},
epoch:{}, epochStartOffset:{}, confirmOffset:{}",
currentState, bodySize, nextOffset, entry.getEpoch(),
entry.getStartOffset(), confirmOffset);
}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index f085488b2..849788320 100644
---
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -81,8 +81,8 @@ public class AutoSwitchRoleBase {
brokerConfig.setListenPort(brokerListenPort);
brokerConfig.setNamesrvAddr(namesrvAddress);
brokerConfig.setControllerAddr(controllerAddress);
- brokerConfig.setReplicasManagerSyncBrokerMetadataPeriod(2 * 1000);
- brokerConfig.setReplicasManagerCheckSyncStateSetPeriod(2 * 1000);
+ brokerConfig.setSyncBrokerMetadataPeriod(2 * 1000);
+ brokerConfig.setCheckSyncStateSetPeriod(2 * 1000);
brokerConfig.setEnableControllerMode(true);
final NettyServerConfig nettyServerConfig = new NettyServerConfig();