This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 6dd88378c1 ARTEMIS-5308 Replicate journal after local checks
6dd88378c1 is described below
commit 6dd88378c1aa3aa9eaeb493abdec5fe7ad015d68
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Thu Feb 6 09:18:08 2025 +0100
ARTEMIS-5308 Replicate journal after local checks
Say there's an issue with the record on the local journal, it would fail on
the local journal before reaching the replica.
---
.../impl/journal/JournalStorageManager.java | 1 -
.../core/replication/ReplicatedJournal.java | 44 ++++++------
.../core/replication/ReplicatedJournalTest.java | 82 ++++++++++++++++++++++
.../integration/replication/ReplicationTest.java | 57 +++++++++++++++
4 files changed, 161 insertions(+), 23 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index bf5808ed36..e5693b56b4 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -527,7 +527,6 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
ActiveMQServerLogger.LOGGER.messageWithHeaderTooLarge(largeMessage.getMessageID(),
logger.getName());
logger.debug("Message header too large for {}", largeMessage);
- new Exception("Trace").printStackTrace();
throw
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(messageEncodeSize,
maxRecordSize);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
index 636063fe98..b688272c60 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
@@ -115,8 +115,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("Append record id = {} recordType = {}", id, recordType);
}
- replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD,
id, recordType, persister, record);
localJournal.appendAddRecord(id, recordType, persister, record, sync);
+ replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD,
id, recordType, persister, record);
}
/**
@@ -137,8 +137,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("Append record id = {} recordType = {}", id, recordType);
}
- replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD,
id, recordType, persister, record);
localJournal.appendAddRecord(id, recordType, persister, record, sync,
completionCallback);
+ replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD,
id, recordType, persister, record);
}
@Override
@@ -151,8 +151,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("Append record id = {} recordType = {}", id, recordType);
}
- replicationManager.appendUpdateRecord(journalID,
ADD_OPERATION_TYPE.EVENT, id, recordType, persister, record);
localJournal.appendAddEvent(id, recordType, persister, record, sync,
completionCallback);
+ replicationManager.appendUpdateRecord(journalID,
ADD_OPERATION_TYPE.EVENT, id, recordType, persister, record);
}
/**
@@ -188,8 +188,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("Append record txID={} recordType = {}", id, recordType);
}
- replicationManager.appendAddRecordTransactional(journalID,
ADD_OPERATION_TYPE.ADD, txID, id, recordType, persister, record);
localJournal.appendAddRecordTransactional(txID, id, recordType,
persister, record);
+ replicationManager.appendAddRecordTransactional(journalID,
ADD_OPERATION_TYPE.ADD, txID, id, recordType, persister, record);
}
/**
@@ -203,8 +203,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendCommit txID={}", txID);
}
- replicationManager.appendCommitRecord(journalID, txID, sync, true);
localJournal.appendCommitRecord(txID, sync);
+ replicationManager.appendCommitRecord(journalID, txID, sync, true);
}
@Override
@@ -212,8 +212,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendCommit {}", txID);
}
- replicationManager.appendCommitRecord(journalID, txID, sync, true);
localJournal.appendCommitRecord(txID, sync, callback);
+ replicationManager.appendCommitRecord(journalID, txID, sync, true);
}
@Override
@@ -224,8 +224,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendCommit {}", txID);
}
- replicationManager.appendCommitRecord(journalID, txID, sync,
lineUpContext);
localJournal.appendCommitRecord(txID, sync, callback, lineUpContext);
+ replicationManager.appendCommitRecord(journalID, txID, sync,
lineUpContext);
}
/**
@@ -239,8 +239,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendDelete {}", id);
}
- replicationManager.appendDeleteRecord(journalID, id);
localJournal.appendDeleteRecord(id, sync);
+ replicationManager.appendDeleteRecord(journalID, id);
}
/**
@@ -254,8 +254,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendDelete {}", id);
}
- replicationManager.appendDeleteRecord(journalID, id);
localJournal.tryAppendDeleteRecord(id, updateCallback, sync);
+ replicationManager.appendDeleteRecord(journalID, id);
}
@Override
@@ -265,8 +265,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendDelete {}", id);
}
- replicationManager.appendDeleteRecord(journalID, id);
localJournal.appendDeleteRecord(id, sync, completionCallback);
+ replicationManager.appendDeleteRecord(journalID, id);
}
@Override
@@ -277,8 +277,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendDelete {}", id);
}
- replicationManager.appendDeleteRecord(journalID, id);
localJournal.tryAppendDeleteRecord(id, sync, updateCallback,
completionCallback);
+ replicationManager.appendDeleteRecord(journalID, id);
}
/**
* @param txID
@@ -306,8 +306,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendDelete txID={} id={}", txID, id);
}
- replicationManager.appendDeleteRecordTransactional(journalID, txID, id,
record);
localJournal.appendDeleteRecordTransactional(txID, id, record);
+ replicationManager.appendDeleteRecordTransactional(journalID, txID, id,
record);
}
/**
@@ -321,8 +321,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendDelete (noencoding) txID={} id={}", txID, id);
}
- replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
localJournal.appendDeleteRecordTransactional(txID, id);
+ replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
}
/**
@@ -351,8 +351,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendPrepare txID={}", txID);
}
- replicationManager.appendPrepareRecord(journalID, txID, transactionData);
localJournal.appendPrepareRecord(txID, transactionData, sync);
+ replicationManager.appendPrepareRecord(journalID, txID, transactionData);
}
@Override
@@ -363,8 +363,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendPrepare txID={}", txID);
}
- replicationManager.appendPrepareRecord(journalID, txID, transactionData);
localJournal.appendPrepareRecord(txID, transactionData, sync, callback);
+ replicationManager.appendPrepareRecord(journalID, txID, transactionData);
}
/**
@@ -378,8 +378,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendRollback {}", txID);
}
- replicationManager.appendRollbackRecord(journalID, txID);
localJournal.appendRollbackRecord(txID, sync);
+ replicationManager.appendRollbackRecord(journalID, txID);
}
@Override
@@ -387,8 +387,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendRollback {}", txID);
}
- replicationManager.appendRollbackRecord(journalID, txID);
localJournal.appendRollbackRecord(txID, sync, callback);
+ replicationManager.appendRollbackRecord(journalID, txID);
}
/**
@@ -435,8 +435,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendUpdateRecord id = {} , recordType = {}", id,
recordType);
}
- replicationManager.appendUpdateRecord(journalID,
ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
localJournal.appendUpdateRecord(id, recordType, persister, record, sync);
+ replicationManager.appendUpdateRecord(journalID,
ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
}
@Override
@@ -449,8 +449,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendUpdateRecord id = {} , recordType = {}", id,
recordType);
}
- replicationManager.appendUpdateRecord(journalID,
ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
localJournal.tryAppendUpdateRecord(id, recordType, persister, record,
updateCallback, sync, replaceable);
+ replicationManager.appendUpdateRecord(journalID,
ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
}
@Override
@@ -463,8 +463,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendUpdateRecord id = {} , recordType = {}", id,
journalRecordType);
}
- replicationManager.appendUpdateRecord(journalID,
ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
localJournal.appendUpdateRecord(id, journalRecordType, persister,
record, sync, completionCallback);
+ replicationManager.appendUpdateRecord(journalID,
ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
}
@Override
@@ -479,8 +479,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendUpdateRecord id = {} , recordType = {}", id,
journalRecordType);
}
- replicationManager.appendUpdateRecord(journalID,
ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
localJournal.tryAppendUpdateRecord(id, journalRecordType, persister,
record, sync, replaceableUpdate, updateCallback, completionCallback);
+ replicationManager.appendUpdateRecord(journalID,
ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
}
/**
@@ -516,8 +516,8 @@ public class ReplicatedJournal implements Journal {
if (logger.isTraceEnabled()) {
logger.trace("AppendUpdateRecord txid={} id = {} , recordType = {}",
txID, id, recordType);
}
- replicationManager.appendAddRecordTransactional(journalID,
ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, persister, record);
localJournal.appendUpdateRecordTransactional(txID, id, recordType,
persister, record);
+ replicationManager.appendAddRecordTransactional(journalID,
ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, persister, record);
}
/**
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/replication/ReplicatedJournalTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/replication/ReplicatedJournalTest.java
new file mode 100644
index 0000000000..96bf2a6dc5
--- /dev/null
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/replication/ReplicatedJournalTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.replication;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ReplicatedJournalTest {
+
+ private static final int NO_INVOCATION = 0;
+ private static final int JOURNAL_INVOCATION = 1;
+ private static final int REPLICA_INVOCATION = 2;
+
+ @Test
+ public void testAppendInvocationOrder() throws Exception {
+ AtomicInteger firstInvocation = new AtomicInteger(NO_INVOCATION);
+
+ Journal mockJournal = Mockito.mock(Journal.class, invocationOnMock -> {
+ if (invocationOnMock.getMethod().getName().startsWith("append") ||
+ invocationOnMock.getMethod().getName().startsWith("tryAppend")) {
+ firstInvocation.compareAndSet(NO_INVOCATION, JOURNAL_INVOCATION);
+ }
+ return null;
+ });
+
+ ReplicationManager replicationManager =
Mockito.mock(ReplicationManager.class, invocationOnMock -> {
+ if (invocationOnMock.getMethod().getName().startsWith("append") ||
+ invocationOnMock.getMethod().getName().startsWith("tryAppend")) {
+ firstInvocation.compareAndSet(NO_INVOCATION, REPLICA_INVOCATION);
+ }
+ return null;
+ });
+
+ ReplicatedJournal replicatedJournal = new ReplicatedJournal((byte)0,
mockJournal, replicationManager);
+
+ for (Method method : ReplicatedJournal.class.getDeclaredMethods()) {
+ if (method.getName().startsWith("append") ||
+ method.getName().startsWith("tryAppend")) {
+ List<Object> args = new ArrayList<>();
+ for (Class parameterType : method.getParameterTypes()) {
+ if (boolean.class.equals(parameterType)) {
+ args.add(false);
+ } else if (byte.class.equals(parameterType)) {
+ args.add((byte)0);
+ } else if (long.class.equals(parameterType)) {
+ args.add((long)0);
+ } else {
+ args.add(null);
+ }
+ }
+
+ method.invoke(replicatedJournal, args.toArray());
+
+ assertEquals(JOURNAL_INVOCATION, firstInvocation.get(),
method.toString());
+
+ firstInvocation.set(NO_INVOCATION);
+ }
+ }
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index c89ce25db7..b14aa311d5 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -363,6 +364,62 @@ public final class ReplicationTest extends
ActiveMQTestBase {
}
}
+ @TestTemplate
+ public void testSendMessageWithLargeHeader() throws Exception {
+ setupServer(true, TestInterceptor.class.getName());
+
+ manager = liveServer.getReplicationManager();
+ waitForComponent(manager);
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ try (ClientSession producerSession = sf.createSession();
+ ClientSession consumerSession = sf.createSession()) {
+
+ producerSession.start();
+ producerSession.createQueue(QueueConfiguration.of(ADDRESS));
+ ClientProducer producer = producerSession.createProducer(ADDRESS);
+
+ consumerSession.start();
+ ClientConsumer consumer = consumerSession.createConsumer(ADDRESS);
+
+ ClientMessage messageBefore = producerSession.createMessage(true);
+ setBody(0, messageBefore);
+ messageBefore.putIntProperty("counter", 0);
+ producer.send(messageBefore);
+
+ ClientMessage messageReceivedBefore = consumer.receive(1000);
+ assertNotNull(messageReceivedBefore, "Message should exist!");
+ assertMessageBody(0, messageReceivedBefore);
+ assertEquals(0,
messageReceivedBefore.getIntProperty("counter").intValue());
+ messageReceivedBefore.acknowledge();
+
+ ClientMessage messageWithLargeHeader =
producerSession.createMessage(true);
+ setBody(1, messageWithLargeHeader);
+ messageWithLargeHeader.putIntProperty("counter", 1);
+ messageWithLargeHeader.putStringProperty("large-property",
"z".repeat(512 * 1024));
+ try {
+ producer.send(messageWithLargeHeader);
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("AMQ149005"));
+ }
+
+ ClientMessage messageAfter = producerSession.createMessage(true);
+ setBody(2, messageAfter);
+ messageAfter.putIntProperty("counter", 2);
+ producer.send(messageAfter);
+
+ ClientMessage messageReceivedAfter = consumer.receive(1000);
+ assertNotNull(messageReceivedAfter, "Message should exist!");
+ assertMessageBody(2, messageReceivedAfter);
+ assertEquals(2,
messageReceivedAfter.getIntProperty("counter").intValue());
+ messageReceivedAfter.acknowledge();
+
+ assertNull(consumer.receiveImmediate());
+ }
+ }
+
@TestTemplate
public void testExceptionSettingActionBefore() throws Exception {
OperationContext ctx = OperationContextImpl.getContext(factory);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact