[ 
https://issues.apache.org/jira/browse/ARTEMIS-4794?focusedWorklogId=925707&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-925707
 ]

ASF GitHub Bot logged work on ARTEMIS-4794:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Jul/24 20:53
            Start Date: 12/Jul/24 20:53
    Worklog Time Spent: 10m 
      Work Description: jbertram commented on code in PR #4996:
URL: https://github.com/apache/activemq-artemis/pull/4996#discussion_r1676449140


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java:
##########
@@ -2084,6 +2085,108 @@ public void testManagementLeak() throws Exception {
       assertEquals(0, 
server.getManagementService().getResources(BridgeControl.class).length);
    }
 
+   @TestTemplate
+   public void testWaitForPendingAcksOnStop() throws Exception {
+      testPendingAcks(true, false);
+   }
+
+   @TestTemplate
+   public void testWaitForPendingAcksOnPause() throws Exception {
+      testPendingAcks(false, false);
+   }
+
+   @TestTemplate
+   public void testWaitForPendingAcksOnStopWithLargeMessages() throws 
Exception {
+      testPendingAcks(true, true);
+   }
+
+   @TestTemplate
+   public void testWaitForPendingAcksOnPauseWithLargeMessages() throws 
Exception {
+      testPendingAcks(false, true);
+   }
+
+   private void testPendingAcks(boolean stop, boolean large) throws Exception {
+      server0 = createClusteredServerWithParams(isNetty(), 0, true, null);
+
+      Map<String, Object> server1Params = new HashMap<>();
+      addTargetParameters(server1Params);
+      server1 = createClusteredServerWithParams(isNetty(), 1, true, 
server1Params);
+
+      final String testAddress = "testAddress";
+      final String queueName0 = "queue0";
+      final String forwardAddress = "forwardAddress";
+      final String queueName1 = "queue1";
+      final long pendingAckTimeout = 2000;
+      final int messageSize = 1024;
+      final int numMessages = 10;
+
+      TransportConfiguration server0tc = new 
TransportConfiguration(getConnector(), null);
+      TransportConfiguration server1tc = new 
TransportConfiguration(getConnector(), server1Params);
+
+      server0.getConfiguration()
+             .setConnectorConfigurations(Map.of(server1tc.getName(), 
server1tc))
+             .setBridgeConfigurations(Arrays.asList(new BridgeConfiguration()
+                                                       .setName("bridge1")
+                                                       
.setQueueName(queueName0)
+                                                       
.setForwardingAddress(forwardAddress)
+                                                       .setRetryInterval(1000)
+                                                       
.setReconnectAttemptsOnSameNode(-1)
+                                                       
.setUseDuplicateDetection(false)
+                                                       
.setConfirmationWindowSize(numMessages * messageSize / 2)
+                                                       
.setMinLargeMessageSize(large ? (messageSize / 2) : (messageSize * 2))
+                                                       
.setPendingAckTimeout(pendingAckTimeout)
+                                                       
.setStaticConnectors(Arrays.asList(server1tc.getName()))));
+      
server0.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of(queueName0).setAddress(testAddress)));
+      server0.start();
+
+      // this interceptor will prevent the target from returning any send 
acknowledgements
+      Interceptor sendBlockingInterceptor = (packet, connection) -> {
+         if (packet.getType() == PacketImpl.SESS_SEND || packet.getType() == 
PacketImpl.SESS_SEND_LARGE) {
+            return false;
+         }
+         return true;
+      };
+
+      
server1.getConfiguration().setQueueConfigs(Arrays.asList(QueueConfiguration.of(queueName1).setAddress(forwardAddress)));
+      server1.start();
+      
server1.getRemotingService().addIncomingInterceptor(sendBlockingInterceptor);
+      Bridge bridge = server0.getClusterManager().getBridges().get("bridge1");
+      Wait.assertTrue(() -> (bridge.isConnected()), 2000, 100);
+
+      locator = 
addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, 
server1tc));
+      ClientSessionFactory sf0 = 
addSessionFactory(locator.createSessionFactory(server0tc));
+      ClientSession session0 = sf0.createSession(false, true, true);
+      ClientProducer producer0 = 
session0.createProducer(SimpleString.of(testAddress));
+      final byte[] bytes = new byte[messageSize];
+
+      final SimpleString propKey = SimpleString.of("testkey");
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = session0.createMessage(true);
+         message.putIntProperty(propKey, i);
+         message.getBodyBuffer().writeBytes(bytes);
+         producer0.send(message);
+      }
+
+      session0.close();
+      sf0.close();
+
+      Wait.assertEquals(10L, () -> 
bridge.getMetrics().getMessagesPendingAcknowledgement(), 2000, 100);

Review Comment:
   Done.





Issue Time Tracking
-------------------

            Worklog Id:     (was: 925707)
    Remaining Estimate: 0h
            Time Spent: 10m

> CoreBridge: Duplicate message when bridge is stopped/Lost message when bridge 
> is paused while messages being produced to target node.
> -------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-4794
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4794
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.30.0, 2.34.0, 2.35.0
>            Reporter: nmeylan
>            Priority: Major
>         Attachments: BridgeARTEMIS4794Test.java, 
> message-not-deliverable.log.txt
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> +Attached test *BridgeDuplicateMessagesARTEMIS4794Test.java*+ highlights the 
> issue with _org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl_
> Place it under 
> _tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge_
> {*}Summary{*}:
>      When a bridge is stopped while messages being produced to the target 
> node, it can lead to duplicate messages.
> {*}Description{*}:
>     When Using bridge and programmatically *stopping* it while messages are 
> being produced to the target node, the source node fails to get the 
> acknowledgement from target node and messages now exists on the source and 
> the target node.
> It appears that the "active" flag being set to false when 
> BridgeImpl.StopRunnable is called prevent message to be acknowledged by 
> _BridgeImpl::sendAcknowledged_ function
>  
> {*}Context{*}:
> This bug appear in my code (a custom plugin) because is start and stop Bridge 
> programmatically to move messages from one node to another when some 
> conditions are met, if they are no longer met I want to stop the moving of 
> messages.
>  
> *Notes:*
>  * Changing bridge configuration 
> {_}useDuplicateDetection{_},{_}confirmationWindowSize{_} or 
> _producerWindowSize_ parameter do not help to mitigate the issue
>  * Not related to large messages, i use large messages in my test to ease 
> reproduction 
>  * Reproduced on 2.30 and 2.34
>  * Calling pause() does not create duplicate 
> {_}server.getClusterManager().getBridges().get(bridgeName).pause(){_};
>  
>  
>  
> *UPDATE:* When using pause instead of stop in above scenari, I get message 
> not being develirable anymore
> {*}Summary{*}:
> When a bridge is paused while *large* messages being produced to the target 
> node, it can lead to message not able to be delivered to new consumers.
> {*}Description{*}:
> When Using bridge and programmatically pausing it while messages are being 
> produced to the target node, If large messages are being delivered, the 
> thread In _BridgeImpl::deliverLargeMessage_ is not awaited, and the bridge is 
> paused then the Runnable of deliverLargeMessage is being run, leading to a 
> situation were the message won't be delivered to new consumers
> {*}Notes{*}:
>  * PauseRunnable does not await for task in {{executor}} to complete, 
> deliverLargeMessage do create task in executor
>  * 
>  ** We can see that even after PauseRunnable has complete, 
> deliverLargeMessage's task is running after.
>  * If I call {{bridge1.onCreditsFlow(true, null);}} to set the flag 
> {{blockedOnFlowControl}} to true, before calling pause, it prevent putting 
> new task on executor and mitigate the issue, but It feels weird and I think 
> there might still be race condition



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to