[ 
https://issues.apache.org/jira/browse/ARTEMIS-4794?focusedWorklogId=925710&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-925710
 ]

ASF GitHub Bot logged work on ARTEMIS-4794:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Jul/24 20:54
            Start Date: 12/Jul/24 20:54
    Worklog Time Spent: 10m 
      Work Description: jbertram commented on code in PR #4996:
URL: https://github.com/apache/activemq-artemis/pull/4996#discussion_r1676449918


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java:
##########
@@ -363,49 +355,43 @@ public Executor getExecutor() {
    }
 
    @Override
-   public void stop() throws Exception {
-      if (stopping) {
-         return;
+   public synchronized void stop() throws Exception {
+      if (state == State.STOPPING || state == State.STOPPED || state == 
State.PAUSING) {
+         logger.debug("Bridge {} state is {}. Ignoring call to stop.", 
configuration.getName(), state);
+         if (state == State.PAUSING) {
+            throw 
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
 "stopped", state);
+         } else {
+            return;
+         }
       }
 
-      stopping = true;
+      state = State.STOPPING;
 
-      logger.debug("Bridge {} being stopped", configuration.getName());
+      logger.debug("Bridge {} is stopping", configuration.getName());
 
-      if (futureScheduledReconnection != null) {
-         futureScheduledReconnection.cancel(true);
+      if (scheduledReconnection != null) {
+         scheduledReconnection.cancel(true);
       }
 
       executor.execute(new StopRunnable());
+   }
 
-      if (notificationService != null) {
-         TypedProperties props = new TypedProperties();
-         props.putSimpleStringProperty(SimpleString.of("name"), 
SimpleString.of(configuration.getName()));
-         Notification notification = new Notification(nodeUUID.toString(), 
CoreNotificationType.BRIDGE_STOPPED, props);
-         try {
-            notificationService.sendNotification(notification);
-         } catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.broadcastBridgeStoppedError(e);
+   @Override
+   public synchronized void pause() throws Exception {
+      if (state == State.STOPPING || state == State.STOPPED || state == 
State.PAUSING || state == State.PAUSED) {
+         logger.debug("Bridge {} state is {}. Ignoring call to pause.", 
configuration.getName(), state);
+         if (state == State.STOPPING || state == State.STOPPED) {
+            throw 
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
 "paused", state);
+         } else {
+            return;

Review Comment:
   Done.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 925710)
    Time Spent: 40m  (was: 0.5h)

> CoreBridge: Duplicate message when bridge is stopped/Lost message when bridge 
> is paused while messages being produced to target node.
> -------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-4794
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4794
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.30.0, 2.34.0, 2.35.0
>            Reporter: nmeylan
>            Priority: Major
>         Attachments: BridgeARTEMIS4794Test.java, 
> message-not-deliverable.log.txt
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> +Attached test *BridgeDuplicateMessagesARTEMIS4794Test.java*+ highlights the 
> issue with _org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl_
> Place it under 
> _tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge_
> {*}Summary{*}:
>      When a bridge is stopped while messages being produced to the target 
> node, it can lead to duplicate messages.
> {*}Description{*}:
>     When Using bridge and programmatically *stopping* it while messages are 
> being produced to the target node, the source node fails to get the 
> acknowledgement from target node and messages now exists on the source and 
> the target node.
> It appears that the "active" flag being set to false when 
> BridgeImpl.StopRunnable is called prevent message to be acknowledged by 
> _BridgeImpl::sendAcknowledged_ function
>  
> {*}Context{*}:
> This bug appear in my code (a custom plugin) because is start and stop Bridge 
> programmatically to move messages from one node to another when some 
> conditions are met, if they are no longer met I want to stop the moving of 
> messages.
>  
> *Notes:*
>  * Changing bridge configuration 
> {_}useDuplicateDetection{_},{_}confirmationWindowSize{_} or 
> _producerWindowSize_ parameter do not help to mitigate the issue
>  * Not related to large messages, i use large messages in my test to ease 
> reproduction 
>  * Reproduced on 2.30 and 2.34
>  * Calling pause() does not create duplicate 
> {_}server.getClusterManager().getBridges().get(bridgeName).pause(){_};
>  
>  
>  
> *UPDATE:* When using pause instead of stop in above scenari, I get message 
> not being develirable anymore
> {*}Summary{*}:
> When a bridge is paused while *large* messages being produced to the target 
> node, it can lead to message not able to be delivered to new consumers.
> {*}Description{*}:
> When Using bridge and programmatically pausing it while messages are being 
> produced to the target node, If large messages are being delivered, the 
> thread In _BridgeImpl::deliverLargeMessage_ is not awaited, and the bridge is 
> paused then the Runnable of deliverLargeMessage is being run, leading to a 
> situation were the message won't be delivered to new consumers
> {*}Notes{*}:
>  * PauseRunnable does not await for task in {{executor}} to complete, 
> deliverLargeMessage do create task in executor
>  * 
>  ** We can see that even after PauseRunnable has complete, 
> deliverLargeMessage's task is running after.
>  * If I call {{bridge1.onCreditsFlow(true, null);}} to set the flag 
> {{blockedOnFlowControl}} to true, before calling pause, it prevent putting 
> new task on executor and mitigate the issue, but It feels weird and I think 
> there might still be race condition



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to