[
https://issues.apache.org/jira/browse/ARTEMIS-4794?focusedWorklogId=925707&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-925707
]
ASF GitHub Bot logged work on ARTEMIS-4794:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 12/Jul/24 20:53
Start Date: 12/Jul/24 20:53
Worklog Time Spent: 10m
Work Description: jbertram commented on code in PR #4996:
URL: https://github.com/apache/activemq-artemis/pull/4996#discussion_r1676449140
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java:
##########
@@ -2084,6 +2085,108 @@ public void testManagementLeak() throws Exception {
assertEquals(0,
server.getManagementService().getResources(BridgeControl.class).length);
}
+ @TestTemplate
+ public void testWaitForPendingAcksOnStop() throws Exception {
+ testPendingAcks(true, false);
+ }
+
+ @TestTemplate
+ public void testWaitForPendingAcksOnPause() throws Exception {
+ testPendingAcks(false, false);
+ }
+
+ @TestTemplate
+ public void testWaitForPendingAcksOnStopWithLargeMessages() throws
Exception {
+ testPendingAcks(true, true);
+ }
+
+ @TestTemplate
+ public void testWaitForPendingAcksOnPauseWithLargeMessages() throws
Exception {
+ testPendingAcks(false, true);
+ }
+
+ private void testPendingAcks(boolean stop, boolean large) throws Exception {
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, null);
+
+ Map<String, Object> server1Params = new HashMap<>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true,
server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+ final long pendingAckTimeout = 2000;
+ final int messageSize = 1024;
+ final int numMessages = 10;
+
+ TransportConfiguration server0tc = new
TransportConfiguration(getConnector(), null);
+ TransportConfiguration server1tc = new
TransportConfiguration(getConnector(), server1Params);
+
+ server0.getConfiguration()
+ .setConnectorConfigurations(Map.of(server1tc.getName(),
server1tc))
+ .setBridgeConfigurations(Arrays.asList(new BridgeConfiguration()
+ .setName("bridge1")
+
.setQueueName(queueName0)
+
.setForwardingAddress(forwardAddress)
+ .setRetryInterval(1000)
+
.setReconnectAttemptsOnSameNode(-1)
+
.setUseDuplicateDetection(false)
+
.setConfirmationWindowSize(numMessages * messageSize / 2)
+
.setMinLargeMessageSize(large ? (messageSize / 2) : (messageSize * 2))
+
.setPendingAckTimeout(pendingAckTimeout)
+
.setStaticConnectors(Arrays.asList(server1tc.getName()))));
+
server0.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of(queueName0).setAddress(testAddress)));
+ server0.start();
+
+ // this interceptor will prevent the target from returning any send
acknowledgements
+ Interceptor sendBlockingInterceptor = (packet, connection) -> {
+ if (packet.getType() == PacketImpl.SESS_SEND || packet.getType() ==
PacketImpl.SESS_SEND_LARGE) {
+ return false;
+ }
+ return true;
+ };
+
+
server1.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of(queueName1).setAddress(forwardAddress)));
+ server1.start();
+
server1.getRemotingService().addIncomingInterceptor(sendBlockingInterceptor);
+ Bridge bridge = server0.getClusterManager().getBridges().get("bridge1");
+ Wait.assertTrue(() -> (bridge.isConnected()), 2000, 100);
+
+ locator =
addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc,
server1tc));
+ ClientSessionFactory sf0 =
addSessionFactory(locator.createSessionFactory(server0tc));
+ ClientSession session0 = sf0.createSession(false, true, true);
+ ClientProducer producer0 =
session0.createProducer(SimpleString.of(testAddress));
+ final byte[] bytes = new byte[messageSize];
+
+ final SimpleString propKey = SimpleString.of("testkey");
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = session0.createMessage(true);
+ message.putIntProperty(propKey, i);
+ message.getBodyBuffer().writeBytes(bytes);
+ producer0.send(message);
+ }
+
+ session0.close();
+ sf0.close();
+
+ Wait.assertEquals(10L, () ->
bridge.getMetrics().getMessagesPendingAcknowledgement(), 2000, 100);
Review Comment:
Done.
Issue Time Tracking
-------------------
Worklog Id: (was: 925707)
Remaining Estimate: 0h
Time Spent: 10m
> CoreBridge: Duplicate message when bridge is stopped/Lost message when bridge
> is paused while messages being produced to target node.
> -------------------------------------------------------------------------------------------------------------------------------------
>
> Key: ARTEMIS-4794
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4794
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Affects Versions: 2.30.0, 2.34.0, 2.35.0
> Reporter: nmeylan
> Priority: Major
> Attachments: BridgeARTEMIS4794Test.java,
> message-not-deliverable.log.txt
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> +Attached test *BridgeDuplicateMessagesARTEMIS4794Test.java*+ highlights the
> issue with _org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl_
> Place it under
> _tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge_
> {*}Summary{*}:
> When a bridge is stopped while messages being produced to the target
> node, it can lead to duplicate messages.
> {*}Description{*}:
> When Using bridge and programmatically *stopping* it while messages are
> being produced to the target node, the source node fails to get the
> acknowledgement from target node and messages now exists on the source and
> the target node.
> It appears that the "active" flag being set to false when
> BridgeImpl.StopRunnable is called prevent message to be acknowledged by
> _BridgeImpl::sendAcknowledged_ function
>
> {*}Context{*}:
> This bug appear in my code (a custom plugin) because is start and stop Bridge
> programmatically to move messages from one node to another when some
> conditions are met, if they are no longer met I want to stop the moving of
> messages.
>
> *Notes:*
> * Changing bridge configuration
> {_}useDuplicateDetection{_},{_}confirmationWindowSize{_} or
> _producerWindowSize_ parameter do not help to mitigate the issue
> * Not related to large messages, i use large messages in my test to ease
> reproduction
> * Reproduced on 2.30 and 2.34
> * Calling pause() does not create duplicate
> {_}server.getClusterManager().getBridges().get(bridgeName).pause(){_};
>
>
>
> *UPDATE:* When using pause instead of stop in above scenari, I get message
> not being develirable anymore
> {*}Summary{*}:
> When a bridge is paused while *large* messages being produced to the target
> node, it can lead to message not able to be delivered to new consumers.
> {*}Description{*}:
> When Using bridge and programmatically pausing it while messages are being
> produced to the target node, If large messages are being delivered, the
> thread In _BridgeImpl::deliverLargeMessage_ is not awaited, and the bridge is
> paused then the Runnable of deliverLargeMessage is being run, leading to a
> situation were the message won't be delivered to new consumers
> {*}Notes{*}:
> * PauseRunnable does not await for task in {{executor}} to complete,
> deliverLargeMessage do create task in executor
> *
> ** We can see that even after PauseRunnable has complete,
> deliverLargeMessage's task is running after.
> * If I call {{bridge1.onCreditsFlow(true, null);}} to set the flag
> {{blockedOnFlowControl}} to true, before calling pause, it prevent putting
> new task on executor and mitigate the issue, but It feels weird and I think
> there might still be race condition
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact