This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new da9695a5f6 ARTEMIS-4704 eliminate unnecessary variable in
ReplicationManager
da9695a5f6 is described below
commit da9695a5f6c3bbeb6fead7658e516895eca15e04
Author: Justin Bertram <[email protected]>
AuthorDate: Fri Mar 29 10:47:58 2024 -0500
ARTEMIS-4704 eliminate unnecessary variable in ReplicationManager
---
.../core/replication/ReplicationManager.java | 66 ++++++++++------------
1 file changed, 30 insertions(+), 36 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index db1696f2c4..f6402bfe55 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -140,9 +140,7 @@ public final class ReplicationManager implements
ActiveMQComponent {
private final Channel replicatingChannel;
- private boolean started;
-
- private volatile boolean enabled;
+ private volatile boolean started;
private final Queue<OperationContext> pendingTokens = new
ConcurrentLinkedQueue<>();
@@ -224,13 +222,13 @@ public final class ReplicationManager implements
ActiveMQComponent {
final byte recordType,
final Persister persister,
final Object record) throws Exception {
- if (enabled) {
+ if (started) {
sendReplicatePacket(new
ReplicationAddMessage(remotingConnection.isBeforeTwoEighteen(), journalID,
operation, id, recordType, persister, record));
}
}
public void appendDeleteRecord(final byte journalID, final long id) throws
Exception {
- if (enabled) {
+ if (started) {
sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
}
}
@@ -242,7 +240,7 @@ public final class ReplicationManager implements
ActiveMQComponent {
final byte recordType,
final Persister persister,
final Object record) throws
Exception {
- if (enabled) {
+ if (started) {
sendReplicatePacket(new
ReplicationAddTXMessage(remotingConnection.isBeforeTwoEighteen(), journalID,
operation, txID, id, recordType, persister, record));
}
}
@@ -251,7 +249,7 @@ public final class ReplicationManager implements
ActiveMQComponent {
final long txID,
boolean sync,
final boolean lineUp) throws Exception {
- if (enabled) {
+ if (started) {
sendReplicatePacket(new ReplicationCommitMessage(journalID, false,
txID), lineUp);
}
}
@@ -260,13 +258,13 @@ public final class ReplicationManager implements
ActiveMQComponent {
final long txID,
final long id,
final EncodingSupport record)
throws Exception {
- if (enabled) {
+ if (started) {
sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID,
id, record));
}
}
public void appendDeleteRecordTransactional(final byte journalID, final
long txID, final long id) throws Exception {
- if (enabled) {
+ if (started) {
sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID,
id, NullEncoding.instance));
}
}
@@ -274,13 +272,13 @@ public final class ReplicationManager implements
ActiveMQComponent {
public void appendPrepareRecord(final byte journalID,
final long txID,
final EncodingSupport transactionData)
throws Exception {
- if (enabled) {
+ if (started) {
sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID,
transactionData));
}
}
public void appendRollbackRecord(final byte journalID, final long txID)
throws Exception {
- if (enabled) {
+ if (started) {
sendReplicatePacket(new ReplicationCommitMessage(journalID, true,
txID));
}
}
@@ -290,45 +288,45 @@ public final class ReplicationManager implements
ActiveMQComponent {
* @param pageNumber
*/
public void pageClosed(final SimpleString storeName, final long pageNumber)
{
- if (enabled) {
+ if (started) {
sendReplicatePacket(new ReplicationPageEventMessage(storeName,
pageNumber, false, remotingConnection.isVersionUsingLongOnPageReplication()));
}
}
public void pageDeleted(final SimpleString storeName, final long
pageNumber) {
- if (enabled) {
+ if (started) {
sendReplicatePacket(new ReplicationPageEventMessage(storeName,
pageNumber, true, remotingConnection.isVersionUsingLongOnPageReplication()));
}
}
public void pageWrite(final PagedMessage message, final long pageNumber) {
- if (enabled) {
+ if (started) {
sendReplicatePacket(new ReplicationPageWriteMessage(message,
pageNumber, remotingConnection.isVersionUsingLongOnPageReplication()));
}
}
public void largeMessageBegin(final long messageId) {
- if (enabled) {
+ if (started) {
sendReplicatePacket(new
ReplicationLargeMessageBeginMessage(messageId));
}
}
//we pass in storageManager to generate ID only if enabled
public void largeMessageDelete(final Long messageId, JournalStorageManager
storageManager) {
- if (enabled) {
+ if (started) {
long pendingRecordID = storageManager.generateID();
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId,
pendingRecordID, true));
}
}
public void largeMessageClosed(final Long messageId, JournalStorageManager
storageManager) {
- if (enabled) {
+ if (started) {
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId,
-1, false));
}
}
public void largeMessageWrite(final long messageId, final byte[] body) {
- if (enabled) {
+ if (started) {
sendReplicatePacket(new
ReplicationLargeMessageWriteMessage(messageId, body));
}
}
@@ -368,8 +366,6 @@ public final class ReplicationManager implements
ActiveMQComponent {
}
started = true;
-
- enabled = true;
}
@Override
@@ -402,8 +398,6 @@ public final class ReplicationManager implements
ActiveMQComponent {
slowReplicationChecker = null;
}
- enabled = false;
-
if (clearTokens) {
clearReplicationTokens();
}
@@ -462,7 +456,7 @@ public final class ReplicationManager implements
ActiveMQComponent {
}
private OperationContext sendReplicatePacket(final Packet packet, boolean
lineUp, ReusableLatch done) {
- if (!enabled) {
+ if (!started) {
packet.release();
return null;
}
@@ -474,7 +468,7 @@ public final class ReplicationManager implements
ActiveMQComponent {
final ReplicatePacketRequest request = new
ReplicatePacketRequest(packet, repliToken, done);
replicatePacketRequests.add(request);
replicationStream.execute(() -> {
- if (enabled) {
+ if (started) {
sendReplicatedPackets(false);
} else {
releaseReplicatedPackets(replicatePacketRequests);
@@ -497,7 +491,7 @@ public final class ReplicationManager implements
ActiveMQComponent {
}
private void checkSlowReplication() {
- if (!enabled) {
+ if (!started) {
return;
}
assert checkEventLoop();
@@ -534,7 +528,7 @@ public final class ReplicationManager implements
ActiveMQComponent {
// We try to:
// - save recursive calls of resume due to flushConnection
// - saving flush pending writes *if* the OS hasn't notified that's
writable again
- if (awaitingResume || isFlushing || !enabled) {
+ if (awaitingResume || isFlushing || !started) {
return;
}
if (replicatePacketRequests.isEmpty()) {
@@ -687,7 +681,7 @@ public final class ReplicationManager implements
ActiveMQComponent {
* @throws Exception
*/
public void syncJournalFile(JournalFile jf,
AbstractJournalStorageManager.JournalContent content) throws Exception {
- if (!enabled) {
+ if (!started) {
return;
}
SequentialFile file = jf.getFile().cloneFile();
@@ -701,13 +695,13 @@ public final class ReplicationManager implements
ActiveMQComponent {
}
public void syncLargeMessageFile(SequentialFile file, long size, long id)
throws Exception {
- if (enabled) {
+ if (started) {
sendLargeFile(null, null, id, file, size);
}
}
public void syncPages(SequentialFile file, long id, SimpleString queueName)
throws Exception {
- if (enabled)
+ if (started)
sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
}
@@ -726,7 +720,7 @@ public final class ReplicationManager implements
ActiveMQComponent {
final long id,
SequentialFile file,
long maxBytesToSend) throws Exception {
- if (!enabled)
+ if (!started)
return;
if (!file.isOpen()) {
file.open();
@@ -801,7 +795,7 @@ public final class ReplicationManager implements
ActiveMQComponent {
AbstractJournalStorageManager.JournalContent contentType,
String nodeID,
boolean allowsAutoFailBack) throws
ActiveMQException {
- if (enabled)
+ if (started)
sendReplicatePacket(new
ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(),
datafiles, contentType, nodeID, allowsAutoFailBack));
}
@@ -814,7 +808,7 @@ public final class ReplicationManager implements
ActiveMQComponent {
* @param nodeID
*/
public void sendSynchronizationDone(String nodeID, long
initialReplicationSyncTimeout, IOCriticalErrorListener criticalErrorListener)
throws ActiveMQReplicationTimeooutException {
- if (enabled) {
+ if (started) {
if (logger.isTraceEnabled()) {
logger.trace("sendSynchronizationDone ::{}, {}", nodeID,
initialReplicationSyncTimeout);
@@ -865,7 +859,7 @@ public final class ReplicationManager implements
ActiveMQComponent {
ArrayList<Long> idsToSend;
idsToSend = new ArrayList<>(largeMessages.keySet());
- if (enabled)
+ if (started)
sendReplicatePacket(new
ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(),
idsToSend));
}
@@ -878,9 +872,9 @@ public final class ReplicationManager implements
ActiveMQComponent {
* @return
*/
public OperationContext sendPrimaryIsStopping(final PrimaryStopping
finalMessage) {
- logger.debug("PRIMARY IS STOPPING?!? message={} enabled={}",
finalMessage, enabled);
- if (enabled) {
- logger.debug("PRIMARY IS STOPPING?!? message={} {}", finalMessage,
enabled);
+ logger.debug("PRIMARY IS STOPPING?!? message={} enabled={}",
finalMessage, started);
+ if (started) {
+ logger.debug("PRIMARY IS STOPPING?!? message={} {}", finalMessage,
started);
return sendReplicatePacket(new
ReplicationPrimaryIsStoppingMessage(finalMessage));
}
return null;