tabish121 commented on code in PR #5499: URL: https://github.com/apache/activemq-artemis/pull/5499#discussion_r1953239642
########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedMessageWithLargeHeaderTest.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp.largemessages; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class AmqpReplicatedMessageWithLargeHeaderTest extends AmqpReplicatedTestSupport { + + private String amqpLiveURI = "tcp://localhost:" + (AMQP_PORT + 10); + private String amqpBackupURI = "tcp://localhost:" + (AMQP_PORT + 10); + + @Override + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return TransportConfigurationUtils.getInVMAcceptor(live); + } + + @Override + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return TransportConfigurationUtils.getInVMConnector(live); + } + + @BeforeEach + @Override + public void setUp() throws Exception { + super.setUp(); + + createReplicatedConfigs(); + primaryConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", amqpLiveURI + "?protocols=AMQP"); + backupConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", amqpBackupURI + "?protocols=AMQP"); + primaryServer.start(); + backupServer.start(); + + primaryServer.getServer().addAddressInfo(new AddressInfo(getQueueName(), RoutingType.ANYCAST)); + primaryServer.getServer().createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST)); + + + waitForRemoteBackupSynchronization(backupServer.getServer()); + } + + public SimpleString getQueueName() { + return SimpleString.of("replicatedTest"); + } + + + @Test + @Timeout(30) + public void testSimpleSend() throws Exception { + try { + String bodyText = "TEST"; + AmqpClient client = createAmqpClient(new URI(amqpLiveURI)); + AmqpConnection connection = client.createConnection(); + addConnection(connection); + connection.connect(); + + AmqpSession sessionBefore = connection.createSession(); + AmqpSender senderBefore = sessionBefore.createSender(getQueueName().toString()); + AmqpReceiver receiverBefore = sessionBefore.createReceiver(getQueueName().toString()); + + Queue queueView = primaryServer.getServer().locateQueue(getQueueName()); + assertNotNull(queueView); + assertEquals(0, queueView.getMessageCount()); + + AmqpMessage msgBefore = new AmqpMessage(); + msgBefore.setDurable(true); + msgBefore.setApplicationProperty("id", "0"); + msgBefore.setBytes(bodyText.getBytes()); + + sessionBefore.begin(); + senderBefore.send(msgBefore); + sessionBefore.commit(); + + receiverBefore.flow(1); + AmqpMessage msgBeforeReceived = receiverBefore.receive(10, TimeUnit.SECONDS); + assertNotNull(msgBeforeReceived); + assertEquals("0", msgBeforeReceived.getApplicationProperty("id")); + msgBeforeReceived.accept(true); + + receiverBefore.flow(1); Review Comment: Because the AMQP test client does not perform a drain when receiveNoWait is called this check will likely not tell you if an errant message was dispatched. It would be more valid to flow two credits and check after first message is read that there are no more messages in the local prefetch buffer of the client. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedMessageWithLargeHeaderTest.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp.largemessages; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class AmqpReplicatedMessageWithLargeHeaderTest extends AmqpReplicatedTestSupport { + + private String amqpLiveURI = "tcp://localhost:" + (AMQP_PORT + 10); + private String amqpBackupURI = "tcp://localhost:" + (AMQP_PORT + 10); + + @Override + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return TransportConfigurationUtils.getInVMAcceptor(live); + } + + @Override + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return TransportConfigurationUtils.getInVMConnector(live); + } + + @BeforeEach + @Override + public void setUp() throws Exception { + super.setUp(); + + createReplicatedConfigs(); + primaryConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", amqpLiveURI + "?protocols=AMQP"); + backupConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", amqpBackupURI + "?protocols=AMQP"); + primaryServer.start(); + backupServer.start(); + + primaryServer.getServer().addAddressInfo(new AddressInfo(getQueueName(), RoutingType.ANYCAST)); + primaryServer.getServer().createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST)); + + + waitForRemoteBackupSynchronization(backupServer.getServer()); + } + + public SimpleString getQueueName() { + return SimpleString.of("replicatedTest"); + } + + + @Test + @Timeout(30) + public void testSimpleSend() throws Exception { + try { + String bodyText = "TEST"; + AmqpClient client = createAmqpClient(new URI(amqpLiveURI)); + AmqpConnection connection = client.createConnection(); + addConnection(connection); + connection.connect(); + + AmqpSession sessionBefore = connection.createSession(); + AmqpSender senderBefore = sessionBefore.createSender(getQueueName().toString()); + AmqpReceiver receiverBefore = sessionBefore.createReceiver(getQueueName().toString()); + + Queue queueView = primaryServer.getServer().locateQueue(getQueueName()); + assertNotNull(queueView); + assertEquals(0, queueView.getMessageCount()); + + AmqpMessage msgBefore = new AmqpMessage(); + msgBefore.setDurable(true); + msgBefore.setApplicationProperty("id", "0"); + msgBefore.setBytes(bodyText.getBytes()); + + sessionBefore.begin(); + senderBefore.send(msgBefore); + sessionBefore.commit(); + + receiverBefore.flow(1); + AmqpMessage msgBeforeReceived = receiverBefore.receive(10, TimeUnit.SECONDS); + assertNotNull(msgBeforeReceived); + assertEquals("0", msgBeforeReceived.getApplicationProperty("id")); + msgBeforeReceived.accept(true); + + receiverBefore.flow(1); + assertNull(receiverBefore.receiveNoWait()); + + AmqpMessage messageWithLargeHeader = new AmqpMessage(); + messageWithLargeHeader.setDurable(true); + messageWithLargeHeader.setApplicationProperty("large-property", "z".repeat(512 * 1024)); + messageWithLargeHeader.setBytes(bodyText.getBytes()); + + sessionBefore.begin(); + try { + senderBefore.send(messageWithLargeHeader); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().contains("AMQ149005")); + } + + senderBefore.close(); + receiverBefore.close(); + sessionBefore.close(); + + AmqpSession sessionAfter = connection.createSession(); + AmqpSender senderAfter = sessionAfter.createSender(getQueueName().toString()); + AmqpReceiver receiverAfter = sessionAfter.createReceiver(getQueueName().toString()); + + AmqpMessage msgAfter = new AmqpMessage(); + msgAfter.setDurable(true); + msgAfter.setApplicationProperty("id", "1"); + msgAfter.setBytes(bodyText.getBytes()); + + sessionAfter.begin(); + senderAfter.send(msgAfter); + sessionAfter.commit(); + + receiverAfter.flow(1); + AmqpMessage msgAfterReceived = receiverAfter.receive(10, TimeUnit.SECONDS); + assertNotNull(msgAfterReceived); + assertEquals("1", msgAfterReceived.getApplicationProperty("id")); + msgAfterReceived.accept(true); + + receiverAfter.flow(1); + assertNull(receiverAfter.receiveNoWait()); Review Comment: Same comment as above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact