This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 6dd88378c1 ARTEMIS-5308 Replicate journal after local checks
6dd88378c1 is described below

commit 6dd88378c1aa3aa9eaeb493abdec5fe7ad015d68
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Thu Feb 6 09:18:08 2025 +0100

    ARTEMIS-5308 Replicate journal after local checks
    
    Say there's an issue with the record on the local journal, it would fail on 
the local journal before reaching the replica.
---
 .../impl/journal/JournalStorageManager.java        |  1 -
 .../core/replication/ReplicatedJournal.java        | 44 ++++++------
 .../core/replication/ReplicatedJournalTest.java    | 82 ++++++++++++++++++++++
 .../integration/replication/ReplicationTest.java   | 57 +++++++++++++++
 4 files changed, 161 insertions(+), 23 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index bf5808ed36..e5693b56b4 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -527,7 +527,6 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
                
ActiveMQServerLogger.LOGGER.messageWithHeaderTooLarge(largeMessage.getMessageID(),
 logger.getName());
 
                logger.debug("Message header too large for {}", largeMessage);
-               new Exception("Trace").printStackTrace();
 
                throw 
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(messageEncodeSize, 
maxRecordSize);
             }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
index 636063fe98..b688272c60 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
@@ -115,8 +115,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("Append record id = {} recordType = {}", id, recordType);
       }
-      replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, 
id, recordType, persister, record);
       localJournal.appendAddRecord(id, recordType, persister, record, sync);
+      replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, 
id, recordType, persister, record);
    }
 
    /**
@@ -137,8 +137,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("Append record id = {} recordType = {}", id, recordType);
       }
-      replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, 
id, recordType, persister, record);
       localJournal.appendAddRecord(id, recordType, persister, record, sync, 
completionCallback);
+      replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, 
id, recordType, persister, record);
    }
 
    @Override
@@ -151,8 +151,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("Append record id = {} recordType = {}", id, recordType);
       }
-      replicationManager.appendUpdateRecord(journalID, 
ADD_OPERATION_TYPE.EVENT, id, recordType, persister, record);
       localJournal.appendAddEvent(id, recordType, persister, record, sync, 
completionCallback);
+      replicationManager.appendUpdateRecord(journalID, 
ADD_OPERATION_TYPE.EVENT, id, recordType, persister, record);
    }
 
    /**
@@ -188,8 +188,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("Append record txID={} recordType = {}", id, recordType);
       }
-      replicationManager.appendAddRecordTransactional(journalID, 
ADD_OPERATION_TYPE.ADD, txID, id, recordType, persister, record);
       localJournal.appendAddRecordTransactional(txID, id, recordType, 
persister, record);
+      replicationManager.appendAddRecordTransactional(journalID, 
ADD_OPERATION_TYPE.ADD, txID, id, recordType, persister, record);
    }
 
    /**
@@ -203,8 +203,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendCommit txID={}", txID);
       }
-      replicationManager.appendCommitRecord(journalID, txID, sync, true);
       localJournal.appendCommitRecord(txID, sync);
+      replicationManager.appendCommitRecord(journalID, txID, sync, true);
    }
 
    @Override
@@ -212,8 +212,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendCommit {}", txID);
       }
-      replicationManager.appendCommitRecord(journalID, txID, sync, true);
       localJournal.appendCommitRecord(txID, sync, callback);
+      replicationManager.appendCommitRecord(journalID, txID, sync, true);
    }
 
    @Override
@@ -224,8 +224,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendCommit {}", txID);
       }
-      replicationManager.appendCommitRecord(journalID, txID, sync, 
lineUpContext);
       localJournal.appendCommitRecord(txID, sync, callback, lineUpContext);
+      replicationManager.appendCommitRecord(journalID, txID, sync, 
lineUpContext);
    }
 
    /**
@@ -239,8 +239,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendDelete {}", id);
       }
-      replicationManager.appendDeleteRecord(journalID, id);
       localJournal.appendDeleteRecord(id, sync);
+      replicationManager.appendDeleteRecord(journalID, id);
    }
 
    /**
@@ -254,8 +254,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendDelete {}", id);
       }
-      replicationManager.appendDeleteRecord(journalID, id);
       localJournal.tryAppendDeleteRecord(id, updateCallback, sync);
+      replicationManager.appendDeleteRecord(journalID, id);
    }
 
    @Override
@@ -265,8 +265,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendDelete {}", id);
       }
-      replicationManager.appendDeleteRecord(journalID, id);
       localJournal.appendDeleteRecord(id, sync, completionCallback);
+      replicationManager.appendDeleteRecord(journalID, id);
    }
 
    @Override
@@ -277,8 +277,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendDelete {}", id);
       }
-      replicationManager.appendDeleteRecord(journalID, id);
       localJournal.tryAppendDeleteRecord(id, sync, updateCallback, 
completionCallback);
+      replicationManager.appendDeleteRecord(journalID, id);
    }
    /**
     * @param txID
@@ -306,8 +306,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendDelete txID={} id={}", txID, id);
       }
-      replicationManager.appendDeleteRecordTransactional(journalID, txID, id, 
record);
       localJournal.appendDeleteRecordTransactional(txID, id, record);
+      replicationManager.appendDeleteRecordTransactional(journalID, txID, id, 
record);
    }
 
    /**
@@ -321,8 +321,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendDelete (noencoding) txID={} id={}", txID, id);
       }
-      replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
       localJournal.appendDeleteRecordTransactional(txID, id);
+      replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
    }
 
    /**
@@ -351,8 +351,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendPrepare txID={}", txID);
       }
-      replicationManager.appendPrepareRecord(journalID, txID, transactionData);
       localJournal.appendPrepareRecord(txID, transactionData, sync);
+      replicationManager.appendPrepareRecord(journalID, txID, transactionData);
    }
 
    @Override
@@ -363,8 +363,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendPrepare txID={}", txID);
       }
-      replicationManager.appendPrepareRecord(journalID, txID, transactionData);
       localJournal.appendPrepareRecord(txID, transactionData, sync, callback);
+      replicationManager.appendPrepareRecord(journalID, txID, transactionData);
    }
 
    /**
@@ -378,8 +378,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendRollback {}", txID);
       }
-      replicationManager.appendRollbackRecord(journalID, txID);
       localJournal.appendRollbackRecord(txID, sync);
+      replicationManager.appendRollbackRecord(journalID, txID);
    }
 
    @Override
@@ -387,8 +387,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendRollback {}", txID);
       }
-      replicationManager.appendRollbackRecord(journalID, txID);
       localJournal.appendRollbackRecord(txID, sync, callback);
+      replicationManager.appendRollbackRecord(journalID, txID);
    }
 
    /**
@@ -435,8 +435,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendUpdateRecord id = {} , recordType = {}", id, 
recordType);
       }
-      replicationManager.appendUpdateRecord(journalID, 
ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
       localJournal.appendUpdateRecord(id, recordType, persister, record, sync);
+      replicationManager.appendUpdateRecord(journalID, 
ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
    }
 
    @Override
@@ -449,8 +449,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendUpdateRecord id = {} , recordType = {}", id, 
recordType);
       }
-      replicationManager.appendUpdateRecord(journalID, 
ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
       localJournal.tryAppendUpdateRecord(id, recordType, persister, record, 
updateCallback, sync, replaceable);
+      replicationManager.appendUpdateRecord(journalID, 
ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
    }
 
    @Override
@@ -463,8 +463,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendUpdateRecord id = {} , recordType = {}", id, 
journalRecordType);
       }
-      replicationManager.appendUpdateRecord(journalID, 
ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
       localJournal.appendUpdateRecord(id, journalRecordType, persister, 
record, sync, completionCallback);
+      replicationManager.appendUpdateRecord(journalID, 
ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
    }
 
    @Override
@@ -479,8 +479,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendUpdateRecord id = {} , recordType = {}", id, 
journalRecordType);
       }
-      replicationManager.appendUpdateRecord(journalID, 
ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
       localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, 
record, sync, replaceableUpdate, updateCallback, completionCallback);
+      replicationManager.appendUpdateRecord(journalID, 
ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
    }
 
    /**
@@ -516,8 +516,8 @@ public class ReplicatedJournal implements Journal {
       if (logger.isTraceEnabled()) {
          logger.trace("AppendUpdateRecord txid={} id = {} , recordType = {}", 
txID, id, recordType);
       }
-      replicationManager.appendAddRecordTransactional(journalID, 
ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, persister, record);
       localJournal.appendUpdateRecordTransactional(txID, id, recordType, 
persister, record);
+      replicationManager.appendAddRecordTransactional(journalID, 
ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, persister, record);
    }
 
    /**
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/replication/ReplicatedJournalTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/replication/ReplicatedJournalTest.java
new file mode 100644
index 0000000000..96bf2a6dc5
--- /dev/null
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/replication/ReplicatedJournalTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.replication;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ReplicatedJournalTest {
+
+   private static final int NO_INVOCATION = 0;
+   private static final int JOURNAL_INVOCATION = 1;
+   private static final int REPLICA_INVOCATION = 2;
+
+   @Test
+   public void testAppendInvocationOrder() throws Exception {
+      AtomicInteger firstInvocation = new AtomicInteger(NO_INVOCATION);
+
+      Journal mockJournal = Mockito.mock(Journal.class, invocationOnMock -> {
+         if (invocationOnMock.getMethod().getName().startsWith("append") ||
+             invocationOnMock.getMethod().getName().startsWith("tryAppend")) {
+            firstInvocation.compareAndSet(NO_INVOCATION, JOURNAL_INVOCATION);
+         }
+         return null;
+      });
+
+      ReplicationManager replicationManager = 
Mockito.mock(ReplicationManager.class, invocationOnMock -> {
+         if (invocationOnMock.getMethod().getName().startsWith("append") ||
+             invocationOnMock.getMethod().getName().startsWith("tryAppend")) {
+            firstInvocation.compareAndSet(NO_INVOCATION, REPLICA_INVOCATION);
+         }
+         return null;
+      });
+
+      ReplicatedJournal replicatedJournal = new ReplicatedJournal((byte)0, 
mockJournal, replicationManager);
+
+      for (Method method : ReplicatedJournal.class.getDeclaredMethods()) {
+         if (method.getName().startsWith("append") ||
+             method.getName().startsWith("tryAppend")) {
+            List<Object> args = new ArrayList<>();
+            for (Class parameterType : method.getParameterTypes()) {
+               if (boolean.class.equals(parameterType)) {
+                  args.add(false);
+               } else if (byte.class.equals(parameterType)) {
+                  args.add((byte)0);
+               } else if (long.class.equals(parameterType)) {
+                  args.add((long)0);
+               } else {
+                  args.add(null);
+               }
+            }
+
+            method.invoke(replicatedJournal, args.toArray());
+
+            assertEquals(JOURNAL_INVOCATION, firstInvocation.get(), 
method.toString());
+
+            firstInvocation.set(NO_INVOCATION);
+         }
+      }
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index c89ce25db7..b14aa311d5 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -363,6 +364,62 @@ public final class ReplicationTest extends 
ActiveMQTestBase {
       }
    }
 
+   @TestTemplate
+   public void testSendMessageWithLargeHeader() throws Exception {
+      setupServer(true, TestInterceptor.class.getName());
+
+      manager = liveServer.getReplicationManager();
+      waitForComponent(manager);
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      try (ClientSession producerSession = sf.createSession();
+           ClientSession consumerSession = sf.createSession()) {
+
+         producerSession.start();
+         producerSession.createQueue(QueueConfiguration.of(ADDRESS));
+         ClientProducer producer = producerSession.createProducer(ADDRESS);
+
+         consumerSession.start();
+         ClientConsumer consumer = consumerSession.createConsumer(ADDRESS);
+
+         ClientMessage messageBefore = producerSession.createMessage(true);
+         setBody(0, messageBefore);
+         messageBefore.putIntProperty("counter", 0);
+         producer.send(messageBefore);
+
+         ClientMessage messageReceivedBefore = consumer.receive(1000);
+         assertNotNull(messageReceivedBefore, "Message should exist!");
+         assertMessageBody(0, messageReceivedBefore);
+         assertEquals(0, 
messageReceivedBefore.getIntProperty("counter").intValue());
+         messageReceivedBefore.acknowledge();
+
+         ClientMessage messageWithLargeHeader = 
producerSession.createMessage(true);
+         setBody(1, messageWithLargeHeader);
+         messageWithLargeHeader.putIntProperty("counter", 1);
+         messageWithLargeHeader.putStringProperty("large-property", 
"z".repeat(512 * 1024));
+         try {
+            producer.send(messageWithLargeHeader);
+            fail();
+         } catch (Exception e) {
+            assertTrue(e.getMessage().contains("AMQ149005"));
+         }
+
+         ClientMessage messageAfter = producerSession.createMessage(true);
+         setBody(2, messageAfter);
+         messageAfter.putIntProperty("counter", 2);
+         producer.send(messageAfter);
+
+         ClientMessage messageReceivedAfter = consumer.receive(1000);
+         assertNotNull(messageReceivedAfter, "Message should exist!");
+         assertMessageBody(2, messageReceivedAfter);
+         assertEquals(2, 
messageReceivedAfter.getIntProperty("counter").intValue());
+         messageReceivedAfter.acknowledge();
+
+         assertNull(consumer.receiveImmediate());
+      }
+   }
+
    @TestTemplate
    public void testExceptionSettingActionBefore() throws Exception {
       OperationContext ctx = OperationContextImpl.getContext(factory);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to