This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new da9695a5f6 ARTEMIS-4704 eliminate unnecessary variable in 
ReplicationManager
da9695a5f6 is described below

commit da9695a5f6c3bbeb6fead7658e516895eca15e04
Author: Justin Bertram <jbert...@apache.org>
AuthorDate: Fri Mar 29 10:47:58 2024 -0500

    ARTEMIS-4704 eliminate unnecessary variable in ReplicationManager
---
 .../core/replication/ReplicationManager.java       | 66 ++++++++++------------
 1 file changed, 30 insertions(+), 36 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index db1696f2c4..f6402bfe55 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -140,9 +140,7 @@ public final class ReplicationManager implements 
ActiveMQComponent {
 
    private final Channel replicatingChannel;
 
-   private boolean started;
-
-   private volatile boolean enabled;
+   private volatile boolean started;
 
    private final Queue<OperationContext> pendingTokens = new 
ConcurrentLinkedQueue<>();
 
@@ -224,13 +222,13 @@ public final class ReplicationManager implements 
ActiveMQComponent {
                                   final byte recordType,
                                   final Persister persister,
                                   final Object record) throws Exception {
-      if (enabled) {
+      if (started) {
          sendReplicatePacket(new 
ReplicationAddMessage(remotingConnection.isBeforeTwoEighteen(), journalID, 
operation, id, recordType, persister, record));
       }
    }
 
    public void appendDeleteRecord(final byte journalID, final long id) throws 
Exception {
-      if (enabled) {
+      if (started) {
          sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
       }
    }
@@ -242,7 +240,7 @@ public final class ReplicationManager implements 
ActiveMQComponent {
                                             final byte recordType,
                                             final Persister persister,
                                             final Object record) throws 
Exception {
-      if (enabled) {
+      if (started) {
          sendReplicatePacket(new 
ReplicationAddTXMessage(remotingConnection.isBeforeTwoEighteen(), journalID, 
operation, txID, id, recordType, persister, record));
       }
    }
@@ -251,7 +249,7 @@ public final class ReplicationManager implements 
ActiveMQComponent {
                                   final long txID,
                                   boolean sync,
                                   final boolean lineUp) throws Exception {
-      if (enabled) {
+      if (started) {
          sendReplicatePacket(new ReplicationCommitMessage(journalID, false, 
txID), lineUp);
       }
    }
@@ -260,13 +258,13 @@ public final class ReplicationManager implements 
ActiveMQComponent {
                                                final long txID,
                                                final long id,
                                                final EncodingSupport record) 
throws Exception {
-      if (enabled) {
+      if (started) {
          sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, 
id, record));
       }
    }
 
    public void appendDeleteRecordTransactional(final byte journalID, final 
long txID, final long id) throws Exception {
-      if (enabled) {
+      if (started) {
          sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, 
id, NullEncoding.instance));
       }
    }
@@ -274,13 +272,13 @@ public final class ReplicationManager implements 
ActiveMQComponent {
    public void appendPrepareRecord(final byte journalID,
                                    final long txID,
                                    final EncodingSupport transactionData) 
throws Exception {
-      if (enabled) {
+      if (started) {
          sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID, 
transactionData));
       }
    }
 
    public void appendRollbackRecord(final byte journalID, final long txID) 
throws Exception {
-      if (enabled) {
+      if (started) {
          sendReplicatePacket(new ReplicationCommitMessage(journalID, true, 
txID));
       }
    }
@@ -290,45 +288,45 @@ public final class ReplicationManager implements 
ActiveMQComponent {
     * @param pageNumber
     */
    public void pageClosed(final SimpleString storeName, final long pageNumber) 
{
-      if (enabled) {
+      if (started) {
          sendReplicatePacket(new ReplicationPageEventMessage(storeName, 
pageNumber, false, remotingConnection.isVersionUsingLongOnPageReplication()));
       }
    }
 
    public void pageDeleted(final SimpleString storeName, final long 
pageNumber) {
-      if (enabled) {
+      if (started) {
          sendReplicatePacket(new ReplicationPageEventMessage(storeName, 
pageNumber, true, remotingConnection.isVersionUsingLongOnPageReplication()));
       }
    }
 
    public void pageWrite(final PagedMessage message, final long pageNumber) {
-      if (enabled) {
+      if (started) {
          sendReplicatePacket(new ReplicationPageWriteMessage(message, 
pageNumber, remotingConnection.isVersionUsingLongOnPageReplication()));
       }
    }
 
    public void largeMessageBegin(final long messageId) {
-      if (enabled) {
+      if (started) {
          sendReplicatePacket(new 
ReplicationLargeMessageBeginMessage(messageId));
       }
    }
 
    //we pass in storageManager to generate ID only if enabled
    public void largeMessageDelete(final Long messageId, JournalStorageManager 
storageManager) {
-      if (enabled) {
+      if (started) {
          long pendingRecordID = storageManager.generateID();
          sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, 
pendingRecordID, true));
       }
    }
 
    public void largeMessageClosed(final Long messageId, JournalStorageManager 
storageManager) {
-      if (enabled) {
+      if (started) {
          sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, 
-1, false));
       }
    }
 
    public void largeMessageWrite(final long messageId, final byte[] body) {
-      if (enabled) {
+      if (started) {
          sendReplicatePacket(new 
ReplicationLargeMessageWriteMessage(messageId, body));
       }
    }
@@ -368,8 +366,6 @@ public final class ReplicationManager implements 
ActiveMQComponent {
       }
 
       started = true;
-
-      enabled = true;
    }
 
    @Override
@@ -402,8 +398,6 @@ public final class ReplicationManager implements 
ActiveMQComponent {
          slowReplicationChecker = null;
       }
 
-      enabled = false;
-
       if (clearTokens) {
          clearReplicationTokens();
       }
@@ -462,7 +456,7 @@ public final class ReplicationManager implements 
ActiveMQComponent {
    }
 
    private OperationContext sendReplicatePacket(final Packet packet, boolean 
lineUp, ReusableLatch done) {
-      if (!enabled) {
+      if (!started) {
          packet.release();
          return null;
       }
@@ -474,7 +468,7 @@ public final class ReplicationManager implements 
ActiveMQComponent {
       final ReplicatePacketRequest request = new 
ReplicatePacketRequest(packet, repliToken, done);
       replicatePacketRequests.add(request);
       replicationStream.execute(() -> {
-         if (enabled) {
+         if (started) {
             sendReplicatedPackets(false);
          } else {
             releaseReplicatedPackets(replicatePacketRequests);
@@ -497,7 +491,7 @@ public final class ReplicationManager implements 
ActiveMQComponent {
    }
 
    private void checkSlowReplication() {
-      if (!enabled) {
+      if (!started) {
          return;
       }
       assert checkEventLoop();
@@ -534,7 +528,7 @@ public final class ReplicationManager implements 
ActiveMQComponent {
       // We try to:
       // - save recursive calls of resume due to flushConnection
       // - saving flush pending writes *if* the OS hasn't notified that's 
writable again
-      if (awaitingResume || isFlushing || !enabled) {
+      if (awaitingResume || isFlushing || !started) {
          return;
       }
       if (replicatePacketRequests.isEmpty()) {
@@ -687,7 +681,7 @@ public final class ReplicationManager implements 
ActiveMQComponent {
     * @throws Exception
     */
    public void syncJournalFile(JournalFile jf, 
AbstractJournalStorageManager.JournalContent content) throws Exception {
-      if (!enabled) {
+      if (!started) {
          return;
       }
       SequentialFile file = jf.getFile().cloneFile();
@@ -701,13 +695,13 @@ public final class ReplicationManager implements 
ActiveMQComponent {
    }
 
    public void syncLargeMessageFile(SequentialFile file, long size, long id) 
throws Exception {
-      if (enabled) {
+      if (started) {
          sendLargeFile(null, null, id, file, size);
       }
    }
 
    public void syncPages(SequentialFile file, long id, SimpleString queueName) 
throws Exception {
-      if (enabled)
+      if (started)
          sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
    }
 
@@ -726,7 +720,7 @@ public final class ReplicationManager implements 
ActiveMQComponent {
                               final long id,
                               SequentialFile file,
                               long maxBytesToSend) throws Exception {
-      if (!enabled)
+      if (!started)
          return;
       if (!file.isOpen()) {
          file.open();
@@ -801,7 +795,7 @@ public final class ReplicationManager implements 
ActiveMQComponent {
                                     
AbstractJournalStorageManager.JournalContent contentType,
                                     String nodeID,
                                     boolean allowsAutoFailBack) throws 
ActiveMQException {
-      if (enabled)
+      if (started)
          sendReplicatePacket(new 
ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), 
datafiles, contentType, nodeID, allowsAutoFailBack));
    }
 
@@ -814,7 +808,7 @@ public final class ReplicationManager implements 
ActiveMQComponent {
     * @param nodeID
     */
    public void sendSynchronizationDone(String nodeID, long 
initialReplicationSyncTimeout, IOCriticalErrorListener criticalErrorListener) 
throws ActiveMQReplicationTimeooutException {
-      if (enabled) {
+      if (started) {
 
          if (logger.isTraceEnabled()) {
             logger.trace("sendSynchronizationDone ::{}, {}", nodeID, 
initialReplicationSyncTimeout);
@@ -865,7 +859,7 @@ public final class ReplicationManager implements 
ActiveMQComponent {
       ArrayList<Long> idsToSend;
       idsToSend = new ArrayList<>(largeMessages.keySet());
 
-      if (enabled)
+      if (started)
          sendReplicatePacket(new 
ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), 
idsToSend));
    }
 
@@ -878,9 +872,9 @@ public final class ReplicationManager implements 
ActiveMQComponent {
     * @return
     */
    public OperationContext sendPrimaryIsStopping(final PrimaryStopping 
finalMessage) {
-      logger.debug("PRIMARY IS STOPPING?!? message={} enabled={}", 
finalMessage, enabled);
-      if (enabled) {
-         logger.debug("PRIMARY IS STOPPING?!? message={} {}", finalMessage, 
enabled);
+      logger.debug("PRIMARY IS STOPPING?!? message={} enabled={}", 
finalMessage, started);
+      if (started) {
+         logger.debug("PRIMARY IS STOPPING?!? message={} {}", finalMessage, 
started);
          return sendReplicatePacket(new 
ReplicationPrimaryIsStoppingMessage(finalMessage));
       }
       return null;

Reply via email to