[ 
https://issues.apache.org/jira/browse/ARTEMIS-5308?focusedWorklogId=956766&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-956766
 ]

ASF GitHub Bot logged work on ARTEMIS-5308:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Feb/25 19:02
            Start Date: 12/Feb/25 19:02
    Worklog Time Spent: 10m 
      Work Description: tabish121 commented on code in PR #5499:
URL: https://github.com/apache/activemq-artemis/pull/5499#discussion_r1953239642


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedMessageWithLargeHeaderTest.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.largemessages;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class AmqpReplicatedMessageWithLargeHeaderTest extends 
AmqpReplicatedTestSupport {
+
+   private String amqpLiveURI = "tcp://localhost:" + (AMQP_PORT + 10);
+   private String amqpBackupURI = "tcp://localhost:" + (AMQP_PORT + 10);
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final 
boolean live) {
+      return TransportConfigurationUtils.getInVMAcceptor(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final 
boolean live) {
+      return TransportConfigurationUtils.getInVMConnector(live);
+   }
+
+   @BeforeEach
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+
+      createReplicatedConfigs();
+      primaryConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", 
amqpLiveURI + "?protocols=AMQP");
+      backupConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", 
amqpBackupURI + "?protocols=AMQP");
+      primaryServer.start();
+      backupServer.start();
+
+      primaryServer.getServer().addAddressInfo(new AddressInfo(getQueueName(), 
RoutingType.ANYCAST));
+      
primaryServer.getServer().createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST));
+
+
+      waitForRemoteBackupSynchronization(backupServer.getServer());
+   }
+
+   public SimpleString getQueueName() {
+      return SimpleString.of("replicatedTest");
+   }
+
+
+   @Test
+   @Timeout(30)
+   public void testSimpleSend() throws Exception {
+      try {
+         String bodyText = "TEST";
+         AmqpClient client = createAmqpClient(new URI(amqpLiveURI));
+         AmqpConnection connection = client.createConnection();
+         addConnection(connection);
+         connection.connect();
+
+         AmqpSession sessionBefore = connection.createSession();
+         AmqpSender senderBefore = 
sessionBefore.createSender(getQueueName().toString());
+         AmqpReceiver receiverBefore = 
sessionBefore.createReceiver(getQueueName().toString());
+
+         Queue queueView = 
primaryServer.getServer().locateQueue(getQueueName());
+         assertNotNull(queueView);
+         assertEquals(0, queueView.getMessageCount());
+
+         AmqpMessage msgBefore = new AmqpMessage();
+         msgBefore.setDurable(true);
+         msgBefore.setApplicationProperty("id", "0");
+         msgBefore.setBytes(bodyText.getBytes());
+
+         sessionBefore.begin();
+         senderBefore.send(msgBefore);
+         sessionBefore.commit();
+
+         receiverBefore.flow(1);
+         AmqpMessage msgBeforeReceived = receiverBefore.receive(10, 
TimeUnit.SECONDS);
+         assertNotNull(msgBeforeReceived);
+         assertEquals("0", msgBeforeReceived.getApplicationProperty("id"));
+         msgBeforeReceived.accept(true);
+
+         receiverBefore.flow(1);

Review Comment:
   Because the AMQP test client does not perform a drain when receiveNoWait is 
called this check will likely not tell you if an errant message was dispatched. 
 It would be more valid to flow two credits and check after first message is 
read that there are no more messages in the local prefetch buffer of the client.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedMessageWithLargeHeaderTest.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.largemessages;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class AmqpReplicatedMessageWithLargeHeaderTest extends 
AmqpReplicatedTestSupport {
+
+   private String amqpLiveURI = "tcp://localhost:" + (AMQP_PORT + 10);
+   private String amqpBackupURI = "tcp://localhost:" + (AMQP_PORT + 10);
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final 
boolean live) {
+      return TransportConfigurationUtils.getInVMAcceptor(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final 
boolean live) {
+      return TransportConfigurationUtils.getInVMConnector(live);
+   }
+
+   @BeforeEach
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+
+      createReplicatedConfigs();
+      primaryConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", 
amqpLiveURI + "?protocols=AMQP");
+      backupConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", 
amqpBackupURI + "?protocols=AMQP");
+      primaryServer.start();
+      backupServer.start();
+
+      primaryServer.getServer().addAddressInfo(new AddressInfo(getQueueName(), 
RoutingType.ANYCAST));
+      
primaryServer.getServer().createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST));
+
+
+      waitForRemoteBackupSynchronization(backupServer.getServer());
+   }
+
+   public SimpleString getQueueName() {
+      return SimpleString.of("replicatedTest");
+   }
+
+
+   @Test
+   @Timeout(30)
+   public void testSimpleSend() throws Exception {
+      try {
+         String bodyText = "TEST";
+         AmqpClient client = createAmqpClient(new URI(amqpLiveURI));
+         AmqpConnection connection = client.createConnection();
+         addConnection(connection);
+         connection.connect();
+
+         AmqpSession sessionBefore = connection.createSession();
+         AmqpSender senderBefore = 
sessionBefore.createSender(getQueueName().toString());
+         AmqpReceiver receiverBefore = 
sessionBefore.createReceiver(getQueueName().toString());
+
+         Queue queueView = 
primaryServer.getServer().locateQueue(getQueueName());
+         assertNotNull(queueView);
+         assertEquals(0, queueView.getMessageCount());
+
+         AmqpMessage msgBefore = new AmqpMessage();
+         msgBefore.setDurable(true);
+         msgBefore.setApplicationProperty("id", "0");
+         msgBefore.setBytes(bodyText.getBytes());
+
+         sessionBefore.begin();
+         senderBefore.send(msgBefore);
+         sessionBefore.commit();
+
+         receiverBefore.flow(1);
+         AmqpMessage msgBeforeReceived = receiverBefore.receive(10, 
TimeUnit.SECONDS);
+         assertNotNull(msgBeforeReceived);
+         assertEquals("0", msgBeforeReceived.getApplicationProperty("id"));
+         msgBeforeReceived.accept(true);
+
+         receiverBefore.flow(1);
+         assertNull(receiverBefore.receiveNoWait());
+
+         AmqpMessage messageWithLargeHeader = new AmqpMessage();
+         messageWithLargeHeader.setDurable(true);
+         messageWithLargeHeader.setApplicationProperty("large-property", 
"z".repeat(512 * 1024));
+         messageWithLargeHeader.setBytes(bodyText.getBytes());
+
+         sessionBefore.begin();
+         try {
+            senderBefore.send(messageWithLargeHeader);
+            fail();
+         } catch (Exception e) {
+            assertTrue(e.getMessage().contains("AMQ149005"));
+         }
+
+         senderBefore.close();
+         receiverBefore.close();
+         sessionBefore.close();
+
+         AmqpSession sessionAfter = connection.createSession();
+         AmqpSender senderAfter = 
sessionAfter.createSender(getQueueName().toString());
+         AmqpReceiver receiverAfter = 
sessionAfter.createReceiver(getQueueName().toString());
+
+         AmqpMessage msgAfter = new AmqpMessage();
+         msgAfter.setDurable(true);
+         msgAfter.setApplicationProperty("id", "1");
+         msgAfter.setBytes(bodyText.getBytes());
+
+         sessionAfter.begin();
+         senderAfter.send(msgAfter);
+         sessionAfter.commit();
+
+         receiverAfter.flow(1);
+         AmqpMessage msgAfterReceived = receiverAfter.receive(10, 
TimeUnit.SECONDS);
+         assertNotNull(msgAfterReceived);
+         assertEquals("1", msgAfterReceived.getApplicationProperty("id"));
+         msgAfterReceived.accept(true);
+
+         receiverAfter.flow(1);
+         assertNull(receiverAfter.receiveNoWait());

Review Comment:
   Same comment as above.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 956766)
    Time Spent: 20m  (was: 10m)

> Messages are blocked after sending too large header
> ---------------------------------------------------
>
>                 Key: ARTEMIS-5308
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-5308
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Domenico Francesco Bruscino
>            Assignee: Domenico Francesco Bruscino
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> When using a high available setup, an issue occurs when sending a message 
> with a header that is too large. The main broker rejects the message, but at 
> the same time, it seems that it is trying to perform replication to the 
> backup broker.
> The backup broker also rejects this replication message, but after this it 
> seems that all further messages are blocked. Only after killing the backup 
> broker, the messages will be processed. Seems like this is related to journal 
> related operations being blocked until backup acknowledges the completion of 
> synchronization 
> (https://activemq.apache.org/components/artemis/documentation/latest/ha.html#replication)
> {code:java}
> [Thread-2 
> (org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector)] 
> 17:41:25,854 WARN  [org.apache.activemq.artemis.core.server] AMQ222086: error 
> handling packet ReplicationAddTXMessage[type=92, channelID=2, 
> responseAsync=false, requiresResponse=false, correlationID=-1] for replication
> org.apache.activemq.artemis.api.core.ActiveMQIOErrorException: AMQ149005: 
> Message of 524513 bytes is bigger than the max record size of 102400 bytes. 
> You should try to move large application properties to the message body.
>       at 
> org.apache.activemq.artemis.core.journal.impl.JournalImpl.checkRecordSize(JournalImpl.java:1011)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.journal.impl.JournalImpl.appendAddRecordTransactional(JournalImpl.java:1269)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.journal.Journal.appendAddRecordTransactional(Journal.java:188)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.journal.impl.JournalBase.appendAddRecordTransactional(JournalBase.java:115)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.journal.impl.JournalImpl.appendAddRecordTransactional(JournalImpl.java:105)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.replication.ReplicationEndpoint.handleAppendAddTXRecord(ReplicationEndpoint.java:766)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.replication.ReplicationEndpoint.handlePacket(ReplicationEndpoint.java:211)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.handlePacket(ChannelImpl.java:852)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:419)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:392)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler.bufferReceived(ClientSessionFactoryImpl.java:1334)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection$1.run(InVMConnection.java:223)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:57)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68)
>  ~[classes/:?]
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  [?:?]
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  [?:?]
>       at 
> org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
>  [classes/:?]
> [Thread-4 (ActiveMQ-server-ActiveMQServerImpl::Server 1)] 17:41:25,887 WARN  
> [org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback] 
> AMQ149005: Message of 524513 bytes is bigger than the max record size of 
> 102400 bytes. You should try to move large application properties to the 
> message body.
> org.apache.activemq.artemis.api.core.ActiveMQIOErrorException: AMQ149005: 
> Message of 524513 bytes is bigger than the max record size of 102400 bytes. 
> You should try to move large application properties to the message body.
>       at 
> org.apache.activemq.artemis.core.journal.impl.JournalImpl.checkRecordSize(JournalImpl.java:1011)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.journal.impl.JournalImpl.appendAddRecordTransactional(JournalImpl.java:1269)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.replication.ReplicatedJournal.appendAddRecordTransactional(ReplicatedJournal.java:192)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.storeMessageTransactional(AbstractJournalStorageManager.java:571)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl.storeDurableReference(PostOfficeImpl.java:1790)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl.processRouteToDurableQueues(PostOfficeImpl.java:1770)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl.processRoute(PostOfficeImpl.java:1706)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl.route(PostOfficeImpl.java:1244)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl.route(PostOfficeImpl.java:1135)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.doSend(ServerSessionImpl.java:2368)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1963)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback.inSessionSend(AMQPSessionCallback.java:624)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback.lambda$serverSend$2(AMQPSessionCallback.java:580)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:57)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32)
>  ~[classes/:?]
>       at 
> org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68)
>  ~[classes/:?]
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  [?:?]
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  [?:?]
>       at 
> org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
>  [classes/:?]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to