This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new da9695a5f6 ARTEMIS-4704 eliminate unnecessary variable in ReplicationManager da9695a5f6 is described below commit da9695a5f6c3bbeb6fead7658e516895eca15e04 Author: Justin Bertram <jbert...@apache.org> AuthorDate: Fri Mar 29 10:47:58 2024 -0500 ARTEMIS-4704 eliminate unnecessary variable in ReplicationManager --- .../core/replication/ReplicationManager.java | 66 ++++++++++------------ 1 file changed, 30 insertions(+), 36 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index db1696f2c4..f6402bfe55 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -140,9 +140,7 @@ public final class ReplicationManager implements ActiveMQComponent { private final Channel replicatingChannel; - private boolean started; - - private volatile boolean enabled; + private volatile boolean started; private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>(); @@ -224,13 +222,13 @@ public final class ReplicationManager implements ActiveMQComponent { final byte recordType, final Persister persister, final Object record) throws Exception { - if (enabled) { + if (started) { sendReplicatePacket(new ReplicationAddMessage(remotingConnection.isBeforeTwoEighteen(), journalID, operation, id, recordType, persister, record)); } } public void appendDeleteRecord(final byte journalID, final long id) throws Exception { - if (enabled) { + if (started) { sendReplicatePacket(new ReplicationDeleteMessage(journalID, id)); } } @@ -242,7 +240,7 @@ public final class ReplicationManager implements ActiveMQComponent { final byte recordType, final Persister persister, final Object record) throws Exception { - if (enabled) { + if (started) { sendReplicatePacket(new ReplicationAddTXMessage(remotingConnection.isBeforeTwoEighteen(), journalID, operation, txID, id, recordType, persister, record)); } } @@ -251,7 +249,7 @@ public final class ReplicationManager implements ActiveMQComponent { final long txID, boolean sync, final boolean lineUp) throws Exception { - if (enabled) { + if (started) { sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp); } } @@ -260,13 +258,13 @@ public final class ReplicationManager implements ActiveMQComponent { final long txID, final long id, final EncodingSupport record) throws Exception { - if (enabled) { + if (started) { sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, record)); } } public void appendDeleteRecordTransactional(final byte journalID, final long txID, final long id) throws Exception { - if (enabled) { + if (started) { sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, NullEncoding.instance)); } } @@ -274,13 +272,13 @@ public final class ReplicationManager implements ActiveMQComponent { public void appendPrepareRecord(final byte journalID, final long txID, final EncodingSupport transactionData) throws Exception { - if (enabled) { + if (started) { sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID, transactionData)); } } public void appendRollbackRecord(final byte journalID, final long txID) throws Exception { - if (enabled) { + if (started) { sendReplicatePacket(new ReplicationCommitMessage(journalID, true, txID)); } } @@ -290,45 +288,45 @@ public final class ReplicationManager implements ActiveMQComponent { * @param pageNumber */ public void pageClosed(final SimpleString storeName, final long pageNumber) { - if (enabled) { + if (started) { sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, false, remotingConnection.isVersionUsingLongOnPageReplication())); } } public void pageDeleted(final SimpleString storeName, final long pageNumber) { - if (enabled) { + if (started) { sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, true, remotingConnection.isVersionUsingLongOnPageReplication())); } } public void pageWrite(final PagedMessage message, final long pageNumber) { - if (enabled) { + if (started) { sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber, remotingConnection.isVersionUsingLongOnPageReplication())); } } public void largeMessageBegin(final long messageId) { - if (enabled) { + if (started) { sendReplicatePacket(new ReplicationLargeMessageBeginMessage(messageId)); } } //we pass in storageManager to generate ID only if enabled public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager) { - if (enabled) { + if (started) { long pendingRecordID = storageManager.generateID(); sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID, true)); } } public void largeMessageClosed(final Long messageId, JournalStorageManager storageManager) { - if (enabled) { + if (started) { sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, -1, false)); } } public void largeMessageWrite(final long messageId, final byte[] body) { - if (enabled) { + if (started) { sendReplicatePacket(new ReplicationLargeMessageWriteMessage(messageId, body)); } } @@ -368,8 +366,6 @@ public final class ReplicationManager implements ActiveMQComponent { } started = true; - - enabled = true; } @Override @@ -402,8 +398,6 @@ public final class ReplicationManager implements ActiveMQComponent { slowReplicationChecker = null; } - enabled = false; - if (clearTokens) { clearReplicationTokens(); } @@ -462,7 +456,7 @@ public final class ReplicationManager implements ActiveMQComponent { } private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, ReusableLatch done) { - if (!enabled) { + if (!started) { packet.release(); return null; } @@ -474,7 +468,7 @@ public final class ReplicationManager implements ActiveMQComponent { final ReplicatePacketRequest request = new ReplicatePacketRequest(packet, repliToken, done); replicatePacketRequests.add(request); replicationStream.execute(() -> { - if (enabled) { + if (started) { sendReplicatedPackets(false); } else { releaseReplicatedPackets(replicatePacketRequests); @@ -497,7 +491,7 @@ public final class ReplicationManager implements ActiveMQComponent { } private void checkSlowReplication() { - if (!enabled) { + if (!started) { return; } assert checkEventLoop(); @@ -534,7 +528,7 @@ public final class ReplicationManager implements ActiveMQComponent { // We try to: // - save recursive calls of resume due to flushConnection // - saving flush pending writes *if* the OS hasn't notified that's writable again - if (awaitingResume || isFlushing || !enabled) { + if (awaitingResume || isFlushing || !started) { return; } if (replicatePacketRequests.isEmpty()) { @@ -687,7 +681,7 @@ public final class ReplicationManager implements ActiveMQComponent { * @throws Exception */ public void syncJournalFile(JournalFile jf, AbstractJournalStorageManager.JournalContent content) throws Exception { - if (!enabled) { + if (!started) { return; } SequentialFile file = jf.getFile().cloneFile(); @@ -701,13 +695,13 @@ public final class ReplicationManager implements ActiveMQComponent { } public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception { - if (enabled) { + if (started) { sendLargeFile(null, null, id, file, size); } } public void syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception { - if (enabled) + if (started) sendLargeFile(null, queueName, id, file, Long.MAX_VALUE); } @@ -726,7 +720,7 @@ public final class ReplicationManager implements ActiveMQComponent { final long id, SequentialFile file, long maxBytesToSend) throws Exception { - if (!enabled) + if (!started) return; if (!file.isOpen()) { file.open(); @@ -801,7 +795,7 @@ public final class ReplicationManager implements ActiveMQComponent { AbstractJournalStorageManager.JournalContent contentType, String nodeID, boolean allowsAutoFailBack) throws ActiveMQException { - if (enabled) + if (started) sendReplicatePacket(new ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), datafiles, contentType, nodeID, allowsAutoFailBack)); } @@ -814,7 +808,7 @@ public final class ReplicationManager implements ActiveMQComponent { * @param nodeID */ public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout, IOCriticalErrorListener criticalErrorListener) throws ActiveMQReplicationTimeooutException { - if (enabled) { + if (started) { if (logger.isTraceEnabled()) { logger.trace("sendSynchronizationDone ::{}, {}", nodeID, initialReplicationSyncTimeout); @@ -865,7 +859,7 @@ public final class ReplicationManager implements ActiveMQComponent { ArrayList<Long> idsToSend; idsToSend = new ArrayList<>(largeMessages.keySet()); - if (enabled) + if (started) sendReplicatePacket(new ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), idsToSend)); } @@ -878,9 +872,9 @@ public final class ReplicationManager implements ActiveMQComponent { * @return */ public OperationContext sendPrimaryIsStopping(final PrimaryStopping finalMessage) { - logger.debug("PRIMARY IS STOPPING?!? message={} enabled={}", finalMessage, enabled); - if (enabled) { - logger.debug("PRIMARY IS STOPPING?!? message={} {}", finalMessage, enabled); + logger.debug("PRIMARY IS STOPPING?!? message={} enabled={}", finalMessage, started); + if (started) { + logger.debug("PRIMARY IS STOPPING?!? message={} {}", finalMessage, started); return sendReplicatePacket(new ReplicationPrimaryIsStoppingMessage(finalMessage)); } return null;