jbertram commented on code in PR #4996:
URL: https://github.com/apache/activemq-artemis/pull/4996#discussion_r1676449140
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java:
##########
@@ -2084,6 +2085,108 @@ public void testManagementLeak() throws Exception {
assertEquals(0,
server.getManagementService().getResources(BridgeControl.class).length);
}
+ @TestTemplate
+ public void testWaitForPendingAcksOnStop() throws Exception {
+ testPendingAcks(true, false);
+ }
+
+ @TestTemplate
+ public void testWaitForPendingAcksOnPause() throws Exception {
+ testPendingAcks(false, false);
+ }
+
+ @TestTemplate
+ public void testWaitForPendingAcksOnStopWithLargeMessages() throws
Exception {
+ testPendingAcks(true, true);
+ }
+
+ @TestTemplate
+ public void testWaitForPendingAcksOnPauseWithLargeMessages() throws
Exception {
+ testPendingAcks(false, true);
+ }
+
+ private void testPendingAcks(boolean stop, boolean large) throws Exception {
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, null);
+
+ Map<String, Object> server1Params = new HashMap<>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true,
server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+ final long pendingAckTimeout = 2000;
+ final int messageSize = 1024;
+ final int numMessages = 10;
+
+ TransportConfiguration server0tc = new
TransportConfiguration(getConnector(), null);
+ TransportConfiguration server1tc = new
TransportConfiguration(getConnector(), server1Params);
+
+ server0.getConfiguration()
+ .setConnectorConfigurations(Map.of(server1tc.getName(),
server1tc))
+ .setBridgeConfigurations(Arrays.asList(new BridgeConfiguration()
+ .setName("bridge1")
+
.setQueueName(queueName0)
+
.setForwardingAddress(forwardAddress)
+ .setRetryInterval(1000)
+
.setReconnectAttemptsOnSameNode(-1)
+
.setUseDuplicateDetection(false)
+
.setConfirmationWindowSize(numMessages * messageSize / 2)
+
.setMinLargeMessageSize(large ? (messageSize / 2) : (messageSize * 2))
+
.setPendingAckTimeout(pendingAckTimeout)
+
.setStaticConnectors(Arrays.asList(server1tc.getName()))));
+
server0.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of(queueName0).setAddress(testAddress)));
+ server0.start();
+
+ // this interceptor will prevent the target from returning any send
acknowledgements
+ Interceptor sendBlockingInterceptor = (packet, connection) -> {
+ if (packet.getType() == PacketImpl.SESS_SEND || packet.getType() ==
PacketImpl.SESS_SEND_LARGE) {
+ return false;
+ }
+ return true;
+ };
+
+
server1.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of(queueName1).setAddress(forwardAddress)));
+ server1.start();
+
server1.getRemotingService().addIncomingInterceptor(sendBlockingInterceptor);
+ Bridge bridge = server0.getClusterManager().getBridges().get("bridge1");
+ Wait.assertTrue(() -> (bridge.isConnected()), 2000, 100);
+
+ locator =
addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc,
server1tc));
+ ClientSessionFactory sf0 =
addSessionFactory(locator.createSessionFactory(server0tc));
+ ClientSession session0 = sf0.createSession(false, true, true);
+ ClientProducer producer0 =
session0.createProducer(SimpleString.of(testAddress));
+ final byte[] bytes = new byte[messageSize];
+
+ final SimpleString propKey = SimpleString.of("testkey");
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = session0.createMessage(true);
+ message.putIntProperty(propKey, i);
+ message.getBodyBuffer().writeBytes(bytes);
+ producer0.send(message);
+ }
+
+ session0.close();
+ sf0.close();
+
+ Wait.assertEquals(10L, () ->
bridge.getMetrics().getMessagesPendingAcknowledgement(), 2000, 100);
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact