This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push: new 99d77df54 [Summer of code] Fix the isuue of frequent disconnection of ha connection (#4399) 99d77df54 is described below commit 99d77df54f2e87be73cd33689292cc85f006d353 Author: hzh0425 <642256...@qq.com> AuthorDate: Thu Jun 2 11:07:19 2022 +0800 [Summer of code] Fix the isuue of frequent disconnection of ha connection (#4399) * fix bug * throw exception in haWriter * fix haconnection can't read msg. * set channel buf = 0 * code review * code review * change client address to slave address * Revert "change client address to slave address" This reverts commit 198bcdb00e6828744514df76659404150c02011f. --- .../org/apache/rocketmq/broker/BrokerStartup.java | 28 ++-- .../container/BrokerContainerProcessor.java | 30 ++-- .../rocketmq/container/BrokerContainerStartup.java | 28 ++-- .../rocketmq/store/ha/DefaultHAConnection.java | 9 +- .../store/ha/autoswitch/AutoSwitchHAClient.java | 176 +++++++++++---------- .../ha/autoswitch/AutoSwitchHAConnection.java | 94 +++++------ .../store/ha/autoswitch/AutoSwitchHAService.java | 14 +- .../org/apache/rocketmq/store/ha/io/HAWriter.java | 30 ++-- 8 files changed, 215 insertions(+), 194 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index ca388b6cf..f4a2fe0c3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -154,20 +154,22 @@ public class BrokerStartup { } } - switch (messageStoreConfig.getBrokerRole()) { - case ASYNC_MASTER: - case SYNC_MASTER: - brokerConfig.setBrokerId(MixAll.MASTER_ID); - break; - case SLAVE: - if (brokerConfig.getBrokerId() <= 0) { - System.out.printf("Slave's brokerId must be > 0"); - System.exit(-3); - } + if (!brokerConfig.isEnableControllerMode()) { + switch (messageStoreConfig.getBrokerRole()) { + case ASYNC_MASTER: + case SYNC_MASTER: + brokerConfig.setBrokerId(MixAll.MASTER_ID); + break; + case SLAVE: + if (brokerConfig.getBrokerId() <= 0) { + System.out.printf("Slave's brokerId must be > 0"); + System.exit(-3); + } - break; - default: - break; + break; + default: + break; + } } if (messageStoreConfig.isEnableDLegerCommitLog()) { diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java index 2512a3307..51cd427dc 100644 --- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java +++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java @@ -117,21 +117,23 @@ public class BrokerContainerProcessor implements NettyRequestProcessor { } if (!messageStoreConfig.isEnableDLegerCommitLog()) { - switch (messageStoreConfig.getBrokerRole()) { - case ASYNC_MASTER: - case SYNC_MASTER: - brokerConfig.setBrokerId(MixAll.MASTER_ID); - break; - case SLAVE: - if (brokerConfig.getBrokerId() <= 0) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("slave broker id must be > 0"); - return response; - } - break; - default: - break; + if (!brokerConfig.isEnableControllerMode()) { + switch (messageStoreConfig.getBrokerRole()) { + case ASYNC_MASTER: + case SYNC_MASTER: + brokerConfig.setBrokerId(MixAll.MASTER_ID); + break; + case SLAVE: + if (brokerConfig.getBrokerId() <= 0) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("slave broker id must be > 0"); + return response; + } + break; + default: + break; + } } if (messageStoreConfig.getTotalReplicas() < messageStoreConfig.getInSyncReplicas() diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java index 94066af8a..d4e94a698 100644 --- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java +++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java @@ -149,20 +149,22 @@ public class BrokerContainerStartup { MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); - switch (messageStoreConfig.getBrokerRole()) { - case ASYNC_MASTER: - case SYNC_MASTER: - brokerConfig.setBrokerId(MixAll.MASTER_ID); - break; - case SLAVE: - if (brokerConfig.getBrokerId() <= 0) { - System.out.printf("Slave's brokerId must be > 0%n"); - System.exit(-3); - } + if (!brokerConfig.isEnableControllerMode()) { + switch (messageStoreConfig.getBrokerRole()) { + case ASYNC_MASTER: + case SYNC_MASTER: + brokerConfig.setBrokerId(MixAll.MASTER_ID); + break; + case SLAVE: + if (brokerConfig.getBrokerId() <= 0) { + System.out.printf("Slave's brokerId must be > 0%n"); + System.exit(-3); + } - break; - default: - break; + break; + default: + break; + } } if (messageStoreConfig.getTotalReplicas() < messageStoreConfig.getInSyncReplicas() diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java index d99099844..aa5da1eb4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java @@ -27,6 +27,7 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.netty.NettySystemConfig; import org.apache.rocketmq.store.SelectMappedBufferResult; public class DefaultHAConnection implements HAConnection { @@ -48,8 +49,12 @@ public class DefaultHAConnection implements HAConnection { this.socketChannel.configureBlocking(false); this.socketChannel.socket().setSoLinger(false, -1); this.socketChannel.socket().setTcpNoDelay(true); - this.socketChannel.socket().setReceiveBufferSize(1024 * 64); - this.socketChannel.socket().setSendBufferSize(1024 * 64); + if (NettySystemConfig.socketSndbufSize > 0) { + this.socketChannel.socket().setReceiveBufferSize(NettySystemConfig.socketSndbufSize); + } + if (NettySystemConfig.socketRcvbufSize > 0) { + this.socketChannel.socket().setSendBufferSize(NettySystemConfig.socketRcvbufSize); + } this.writeSocketService = new WriteSocketService(this.socketChannel); this.readSocketService = new ReadSocketService(this.socketChannel); this.haService.getConnectionCount().incrementAndGet(); diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java index 01272d47a..997deb7e8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java @@ -20,7 +20,6 @@ package org.apache.rocketmq.store.ha.autoswitch; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; @@ -58,6 +57,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { * Transfer header buffer size. Schema: state ordinal + maxOffset. */ public static final int TRANSFER_HEADER_SIZE = 4 + 8; + public static final int MIN_HEADER_SIZE = Math.min(HANDSHAKE_HEADER_SIZE, TRANSFER_HEADER_SIZE); private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4; private final AtomicReference<String> masterHaAddress = new AtomicReference<>(); @@ -247,7 +247,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { return interval > this.messageStore.getMessageStoreConfig().getHaSendHeartbeatInterval(); } - private boolean sendHandshakeHeader() { + private boolean sendHandshakeHeader() throws IOException { this.handshakeHeaderBuffer.position(0); this.handshakeHeaderBuffer.limit(HANDSHAKE_SIZE); // Original state @@ -278,11 +278,10 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { result = this.haReader.read(this.socketChannel, this.byteBufferRead); if (!result) { closeMasterAndWait(); - return; } } - private boolean reportSlaveOffset(final long offsetToReport) { + private boolean reportSlaveOffset(final long offsetToReport) throws IOException { this.transferHeaderBuffer.position(0); this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE); this.transferHeaderBuffer.putInt(this.currentState.ordinal()); @@ -291,7 +290,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer); } - private boolean reportSlaveMaxOffset() { + private boolean reportSlaveMaxOffset() throws IOException { boolean result = true; final long maxPhyOffset = this.messageStore.getMaxPhyOffset(); if (maxPhyOffset > this.currentReportedOffset) { @@ -301,7 +300,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { return result; } - public boolean connectMaster() throws ClosedChannelException { + public boolean connectMaster() throws IOException { if (null == this.socketChannel) { String addr = this.masterHaAddress.get(); if (StringUtils.isNotEmpty(addr)) { @@ -391,7 +390,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { /** * Compare the master and slave's epoch file, find consistent point, do truncate. */ - private boolean doTruncate(List<EpochEntry> masterEpochEntries, long masterEndOffset) { + private boolean doTruncate(List<EpochEntry> masterEpochEntries, long masterEndOffset) throws IOException { if (this.epochCache.getEntrySize() == 0) { // If epochMap is empty, means the broker is a new replicas LOGGER.info("Slave local epochCache is empty, skip truncate log"); @@ -433,94 +432,97 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { @Override protected boolean processReadResult(ByteBuffer byteBufferRead) { int readSocketPos = byteBufferRead.position(); + try { + while (true) { + int diff = byteBufferRead.position() - AutoSwitchHAClient.this.processPosition; + if (diff >= AutoSwitchHAConnection.MSG_HEADER_SIZE) { + int processPosition = AutoSwitchHAClient.this.processPosition; + int masterState = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 36); + int bodySize = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 32); + long masterOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 28); + int masterEpoch = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 20); + long masterEpochStartOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 16); + long confirmOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 8); + + if (masterState != AutoSwitchHAClient.this.currentState.ordinal()) { + AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize; + AutoSwitchHAClient.this.waitForRunning(1); + LOGGER.error("State not matched, masterState:{}, slaveState:{}, bodySize:{}, offset:{}, masterEpoch:{}, masterEpochStartOffset:{}, confirmOffset:{}", + masterState, AutoSwitchHAClient.this.currentState, bodySize, masterOffset, masterEpoch, masterEpochStartOffset, confirmOffset); + return true; + } - while (true) { - int diff = byteBufferRead.position() - AutoSwitchHAClient.this.processPosition; - if (diff >= AutoSwitchHAConnection.MSG_HEADER_SIZE) { - int processPosition = AutoSwitchHAClient.this.processPosition; - int masterState = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 36); - int bodySize = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 32); - long masterOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 28); - int masterEpoch = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 20); - long masterEpochStartOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 16); - long confirmOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE - 8); - - if (masterState != AutoSwitchHAClient.this.currentState.ordinal()) { - AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize; - AutoSwitchHAClient.this.waitForRunning(1); - LOGGER.error("State not matched, masterState:{}, slaveState:{}, bodySize:{}, offset:{}, masterEpoch:{}, masterEpochStartOffset:{}, confirmOffset:{}", - masterState, AutoSwitchHAClient.this.currentState, bodySize, masterOffset, masterEpoch, masterEpochStartOffset, confirmOffset); - return true; - } + if (diff >= (AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize)) { + switch (AutoSwitchHAClient.this.currentState) { + case HANDSHAKE: + AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE; + // Truncate log + int entrySize = AutoSwitchHAConnection.EPOCH_ENTRY_SIZE; + final int entryNums = bodySize / entrySize; + final ArrayList<EpochEntry> epochEntries = new ArrayList<>(entryNums); + for (int i = 0; i < entryNums; i++) { + int epoch = byteBufferRead.getInt(AutoSwitchHAClient.this.processPosition + i * entrySize); + long startOffset = byteBufferRead.getLong(AutoSwitchHAClient.this.processPosition + i * entrySize + 4); + epochEntries.add(new EpochEntry(epoch, startOffset)); + } + byteBufferRead.position(readSocketPos); + AutoSwitchHAClient.this.processPosition += bodySize; + LOGGER.info("Receive handshake, masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset, epochEntries); + if (!doTruncate(epochEntries, masterOffset)) { + waitForRunning(1000 * 2); + LOGGER.error("AutoSwitchHAClient truncate log failed in handshake state"); + return false; + } + break; + case TRANSFER: + byte[] bodyData = new byte[bodySize]; + byteBufferRead.position(AutoSwitchHAClient.this.processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE); + byteBufferRead.get(bodyData); + byteBufferRead.position(readSocketPos); + AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize; + + long slavePhyOffset = AutoSwitchHAClient.this.messageStore.getMaxPhyOffset(); + if (slavePhyOffset != 0) { + if (slavePhyOffset != masterOffset) { + LOGGER.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + + slavePhyOffset + " MASTER: " + masterOffset); + return false; + } + } + + // If epoch changed + if (masterEpoch != AutoSwitchHAClient.this.currentReceivedEpoch) { + AutoSwitchHAClient.this.currentReceivedEpoch = masterEpoch; + AutoSwitchHAClient.this.epochCache.appendEntry(new EpochEntry(masterEpoch, masterEpochStartOffset)); + } + AutoSwitchHAClient.this.confirmOffset = Math.min(confirmOffset, messageStore.getMaxPhyOffset()); - if (diff >= (AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize)) { - switch (AutoSwitchHAClient.this.currentState) { - case HANDSHAKE: - AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE; - // Truncate log - int entrySize = AutoSwitchHAConnection.EPOCH_ENTRY_SIZE; - final int entryNums = bodySize / entrySize; - final ArrayList<EpochEntry> epochEntries = new ArrayList<>(entryNums); - for (int i = 0; i < entryNums; i++) { - int epoch = byteBufferRead.getInt(AutoSwitchHAClient.this.processPosition + i * entrySize); - long startOffset = byteBufferRead.getLong(AutoSwitchHAClient.this.processPosition + i * entrySize + 4); - epochEntries.add(new EpochEntry(epoch, startOffset)); - } - byteBufferRead.position(readSocketPos); - AutoSwitchHAClient.this.processPosition += bodySize; - LOGGER.info("Receive handshake, masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset, epochEntries); - if (!doTruncate(epochEntries, masterOffset)) { - waitForRunning(1000 * 2); - LOGGER.error("AutoSwitchHAClient truncate log failed in handshake state"); - return false; - } - break; - case TRANSFER: - byte[] bodyData = new byte[bodySize]; - byteBufferRead.position(AutoSwitchHAClient.this.processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE); - byteBufferRead.get(bodyData); - byteBufferRead.position(readSocketPos); - AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize; - - long slavePhyOffset = AutoSwitchHAClient.this.messageStore.getMaxPhyOffset(); - if (slavePhyOffset != 0) { - if (slavePhyOffset != masterOffset) { - LOGGER.error("master pushed offset not equal the max phy offset in slave, SLAVE: " - + slavePhyOffset + " MASTER: " + masterOffset); + if (bodySize > 0) { + AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length); + } + + if (!reportSlaveMaxOffset()) { + LOGGER.error("AutoSwitchHAClient report max offset to master failed"); return false; } - } - - // If epoch changed - if (masterEpoch != AutoSwitchHAClient.this.currentReceivedEpoch) { - AutoSwitchHAClient.this.currentReceivedEpoch = masterEpoch; - AutoSwitchHAClient.this.epochCache.appendEntry(new EpochEntry(masterEpoch, masterEpochStartOffset)); - } - AutoSwitchHAClient.this.confirmOffset = Math.min(confirmOffset, messageStore.getMaxPhyOffset()); - - if (bodySize > 0) { - AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length); - } - - if (!reportSlaveMaxOffset()) { - LOGGER.error("AutoSwitchHAClient report max offset to master failed"); - return false; - } - break; - default: - break; + break; + default: + break; + } + continue; } - continue; } - } - if (!byteBufferRead.hasRemaining()) { - byteBufferRead.position(AutoSwitchHAClient.this.processPosition); - byteBufferRead.compact(); - AutoSwitchHAClient.this.processPosition = 0; - } + if (!byteBufferRead.hasRemaining()) { + byteBufferRead.position(AutoSwitchHAClient.this.processPosition); + byteBufferRead.compact(); + AutoSwitchHAClient.this.processPosition = 0; + } - break; + break; + } + } catch (final Exception e) { + LOGGER.error("Error when ha client process read request", e); } return true; } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java index a253b4cb1..d8a26e20e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.netty.NettySystemConfig; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.ha.FlowMonitor; @@ -90,8 +91,12 @@ public class AutoSwitchHAConnection implements HAConnection { this.socketChannel.configureBlocking(false); this.socketChannel.socket().setSoLinger(false, -1); this.socketChannel.socket().setTcpNoDelay(true); - this.socketChannel.socket().setReceiveBufferSize(1024 * 64); - this.socketChannel.socket().setSendBufferSize(1024 * 64); + if (NettySystemConfig.socketSndbufSize > 0) { + this.socketChannel.socket().setReceiveBufferSize(NettySystemConfig.socketSndbufSize); + } + if (NettySystemConfig.socketRcvbufSize > 0) { + this.socketChannel.socket().setSendBufferSize(NettySystemConfig.socketRcvbufSize); + } this.writeSocketService = new WriteSocketService(this.socketChannel); this.readSocketService = new ReadSocketService(this.socketChannel); this.haService.getConnectionCount().incrementAndGet(); @@ -277,58 +282,55 @@ public class AutoSwitchHAConnection implements HAConnection { @Override protected boolean processReadResult(ByteBuffer byteBufferRead) { while (true) { + boolean processSuccess = true; int readSocketPos = byteBufferRead.position(); int diff = byteBufferRead.position() - ReadSocketService.this.processPosition; - if (diff >= 4) { + if (diff >= AutoSwitchHAClient.MIN_HEADER_SIZE) { int readPosition = ReadSocketService.this.processPosition; HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)]; switch (slaveState) { case HANDSHAKE: - if (diff >= AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE) { - // AddressLength - int addressLength = byteBufferRead.getInt(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 4); - if (diff < AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength) { - break; - } - // Flag(isSyncFromLastFile) - short syncFromLastFileFlag = byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 8); - if (syncFromLastFileFlag == 1) { - AutoSwitchHAConnection.this.isSyncFromLastFile = true; - } - // Flag(isAsyncLearner role) - short isAsyncLearner = byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 6); - if (isAsyncLearner == 1) { - AutoSwitchHAConnection.this.isAsyncLearner = true; - } - // Address - final byte[] addressData = new byte[addressLength]; - byteBufferRead.position(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE); - byteBufferRead.get(addressData); - AutoSwitchHAConnection.this.slaveAddress = new String(addressData); - - isSlaveSendHandshake = true; - byteBufferRead.position(readSocketPos); - ReadSocketService.this.processPosition += AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength; - LOGGER.info("Receive slave handshake, slaveId:{}, slaveAddress:{}, isSyncFromLastFile:{}, isAsyncLearner:{}", - AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.slaveAddress, - AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.isAsyncLearner); + // AddressLength + int addressLength = byteBufferRead.getInt(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 4); + if (diff < AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength) { + processSuccess = false; + break; + } + // Flag(isSyncFromLastFile) + short syncFromLastFileFlag = byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 8); + if (syncFromLastFileFlag == 1) { + AutoSwitchHAConnection.this.isSyncFromLastFile = true; + } + // Flag(isAsyncLearner role) + short isAsyncLearner = byteBufferRead.getShort(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE - 6); + if (isAsyncLearner == 1) { + AutoSwitchHAConnection.this.isAsyncLearner = true; } + // Address + final byte[] addressData = new byte[addressLength]; + byteBufferRead.position(readPosition + AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE); + byteBufferRead.get(addressData); + AutoSwitchHAConnection.this.slaveAddress = new String(addressData); + + isSlaveSendHandshake = true; + byteBufferRead.position(readSocketPos); + ReadSocketService.this.processPosition += AutoSwitchHAClient.HANDSHAKE_HEADER_SIZE + addressLength; + LOGGER.info("Receive slave handshake, slaveAddress:{}, isSyncFromLastFile:{}, isAsyncLearner:{}", + AutoSwitchHAConnection.this.slaveAddress, AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.isAsyncLearner); break; case TRANSFER: - if (diff >= AutoSwitchHAClient.TRANSFER_HEADER_SIZE) { - long slaveMaxOffset = byteBufferRead.getLong(readPosition + 4); - ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE; + long slaveMaxOffset = byteBufferRead.getLong(readPosition + 4); + ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE; - AutoSwitchHAConnection.this.slaveAckOffset = slaveMaxOffset; - if (slaveRequestOffset < 0) { - slaveRequestOffset = slaveMaxOffset; - } - byteBufferRead.position(readSocketPos); - maybeExpandInSyncStateSet(slaveMaxOffset); - AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset); - LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset); + AutoSwitchHAConnection.this.slaveAckOffset = slaveMaxOffset; + if (slaveRequestOffset < 0) { + slaveRequestOffset = slaveMaxOffset; } + byteBufferRead.position(readSocketPos); + maybeExpandInSyncStateSet(slaveMaxOffset); + AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset); + LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset); break; default: LOGGER.error("Current state illegal {}", currentState); @@ -339,7 +341,9 @@ public class AutoSwitchHAConnection implements HAConnection { LOGGER.warn("Master change state from {} to {}", currentState, slaveState); changeCurrentState(slaveState); } - continue; + if (processSuccess) { + continue; + } } if (!byteBufferRead.hasRemaining()) { @@ -379,7 +383,7 @@ public class AutoSwitchHAConnection implements HAConnection { } @Override - protected boolean transferData(int maxTransferSize) { + protected boolean transferData(int maxTransferSize) throws Exception { if (null != this.selectMappedBufferResult && maxTransferSize >= 0) { this.selectMappedBufferResult.getByteBuffer().limit(maxTransferSize); @@ -489,7 +493,7 @@ public class AutoSwitchHAConnection implements HAConnection { return true; } - private boolean handshakeWithSlave() { + private boolean handshakeWithSlave() throws IOException { // Write Header boolean result = this.haWriter.write(this.socketChannel, this.byteBufferHeader); diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java index 773817072..84b047e2d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java @@ -46,7 +46,7 @@ import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService; */ public class AutoSwitchHAService extends DefaultHAService { private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); - private final ExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_")); + private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_")); private final List<Consumer<Set<String>>> syncStateSetChangedListeners = new ArrayList<>(); private final CopyOnWriteArraySet<String> syncStateSet = new CopyOnWriteArraySet<>(); private String localAddress; @@ -144,6 +144,16 @@ public class AutoSwitchHAService extends DefaultHAService { @Override public void updateMasterAddress(String newAddr) { } + @Override public void removeConnection(HAConnection conn) { + final Set<String> syncStateSet = getSyncStateSet(); + String slave = ((AutoSwitchHAConnection) conn).getSlaveAddress(); + if (syncStateSet.contains(slave)) { + syncStateSet.remove(slave); + notifySyncStateSetChanged(syncStateSet); + } + super.removeConnection(conn); + } + public void registerSyncStateSetChangedListener(final Consumer<Set<String>> listener) { this.syncStateSetChangedListeners.add(listener); } @@ -168,7 +178,7 @@ public class AutoSwitchHAService extends DefaultHAService { final AutoSwitchHAConnection connection = (AutoSwitchHAConnection) haConnection; final String slaveAddress = connection.getSlaveAddress(); if (currentSyncStateSet.contains(slaveAddress)) { - if (this.defaultMessageStore.getMaxPhyOffset() == connection.getSlaveAckOffset()) { + if (connection.getSlaveAckOffset() < 0 || this.defaultMessageStore.getMaxPhyOffset() == connection.getSlaveAckOffset()) { continue; } if ((System.currentTimeMillis() - connection.getLastCatchUpTimeMs()) > haMaxTimeSlaveNotCatchup) { diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriter.java b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriter.java index 21daaa6f7..4835ca8ba 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriter.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriter.java @@ -30,28 +30,22 @@ public class HAWriter { private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); protected final List<HAWriteHook> writeHookList = new ArrayList<>(); - public boolean write(SocketChannel socketChannel, ByteBuffer byteBufferWrite) { + public boolean write(SocketChannel socketChannel, ByteBuffer byteBufferWrite) throws IOException { int writeSizeZeroTimes = 0; while (byteBufferWrite.hasRemaining()) { - try { - int writeSize = socketChannel.write(byteBufferWrite); - for (HAWriteHook writeHook : writeHookList) { - writeHook.afterWrite(writeSize); - } - if (writeSize > 0) { - writeSizeZeroTimes = 0; - } else if (writeSize == 0) { - if (++writeSizeZeroTimes >= 3) { - break; - } - } else { - LOGGER.info("Write socket < 0"); + int writeSize = socketChannel.write(byteBufferWrite); + for (HAWriteHook writeHook : writeHookList) { + writeHook.afterWrite(writeSize); + } + if (writeSize > 0) { + writeSizeZeroTimes = 0; + } else if (writeSize == 0) { + if (++writeSizeZeroTimes >= 3) { + break; } - } catch (IOException e) { - LOGGER.info("Write socket exception", e); - return false; + } else { + LOGGER.info("Write socket < 0"); } - } return !byteBufferWrite.hasRemaining();