[ 
https://issues.apache.org/jira/browse/ARTEMIS-3557?focusedWorklogId=684651&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-684651
 ]

ASF GitHub Bot logged work on ARTEMIS-3557:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Nov/21 12:13
            Start Date: 22/Nov/21 12:13
    Worklog Time Spent: 10m 
      Work Description: gtully commented on a change in pull request #3858:
URL: https://github.com/apache/activemq-artemis/pull/3858#discussion_r754136911



##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
##########
@@ -315,6 +315,37 @@ public void onNotification(final Notification 
notification) {
 
                queueInfos.put(clusterName, info);
 
+               if (distance < 1) {
+                  //Binding added locally. If a matching remote binding with 
consumers exist, add a redistributor
+                  Binding binding = getBinding(routingName);
+
+                  if (binding != null) {
+                     try {
+                        Bindings bindings = getBindingsForAddress(address);
+
+                        for (Binding bind : bindings.getBindings()) {
+                           if (bind.isConnected() && bind instanceof 
RemoteQueueBinding) {
+
+                              RemoteQueueBinding remoteBinding = 
(RemoteQueueBinding) bind;
+
+                              if (remoteBinding.consumerCount() > 0) {
+
+                                 Queue queue = (Queue) binding.getBindable();
+                                 AddressSettings addressSettings = 
addressSettingsRepository.getMatch(binding.getAddress().toString());
+                                 long redistributionDelay = 
addressSettings.getRedistributionDelay();
+
+                                 if (redistributionDelay != -1) {
+                                    
queue.addRedistributor(redistributionDelay);

Review comment:
       hmm, I think we need a redistribution delay here, if delay is 0 we risk 
moving messages between binding creation and consumer creation, which is the 
normal pattern for a JMS consumer.

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
##########
@@ -723,7 +723,7 @@ protected void sendInRange(final int node,
                               final int msgEnd,
                               final boolean durable,
                               final String filterVal) throws Exception {
-      sendInRange(node, address, msgStart, msgEnd, durable, filterVal, null);

Review comment:
       these changes don't seem to be necessary for the "redistribute to old 
bindings" use case?

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
##########
@@ -794,6 +794,46 @@ public void 
testRedistributionOnlyWhenLocalRemovedLbOffWithRedistribution() thro
       verifyReceiveAll(2, 1);
    }
 
+   @Test
+   public void 
testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistribution() throws 
Exception {
+
+      String address = "test.address";
+      String queue = "test.address";
+      String clusterAddress = "test";
+      AddressSettings settings = new 
AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true);

Review comment:
       To have this test work, I needed to add a > 0 setRedistributionDelay, 
otherwise the assert on 831 - waiting for some message count on 1 would fail 
b/c the messages were already moved/redistributed.

##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
##########
@@ -315,6 +315,37 @@ public void onNotification(final Notification 
notification) {
 
                queueInfos.put(clusterName, info);
 
+               if (distance < 1) {
+                  //Binding added locally. If a matching remote binding with 
consumers exist, add a redistributor
+                  Binding binding = getBinding(routingName);
+
+                  if (binding != null) {
+                     try {
+                        Bindings bindings = getBindingsForAddress(address);
+
+                        for (Binding bind : bindings.getBindings()) {
+                           if (bind.isConnected() && bind instanceof 
RemoteQueueBinding) {
+
+                              RemoteQueueBinding remoteBinding = 
(RemoteQueueBinding) bind;
+
+                              if (remoteBinding.consumerCount() > 0) {
+
+                                 Queue queue = (Queue) binding.getBindable();
+                                 AddressSettings addressSettings = 
addressSettingsRepository.getMatch(binding.getAddress().toString());
+                                 long redistributionDelay = 
addressSettings.getRedistributionDelay();
+
+                                 if (redistributionDelay != -1) {

Review comment:
       I wonder if we want to break earlier if this condition is not met rather 
than checking each remoteBinding consumer count etc. maybe check the 
addressSettings at line:319

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
##########
@@ -794,6 +794,46 @@ public void 
testRedistributionOnlyWhenLocalRemovedLbOffWithRedistribution() thro
       verifyReceiveAll(2, 1);
    }
 
+   @Test
+   public void 
testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistribution() throws 
Exception {
+
+      String address = "test.address";
+      String queue = "test.address";
+      String clusterAddress = "test";
+      AddressSettings settings = new 
AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true);
+      RoutingType routingType = RoutingType.ANYCAST;
+
+      getServer(0).getAddressSettingsRepository().addMatch(address, settings);
+      getServer(1).getAddressSettingsRepository().addMatch(address, settings);
+
+      setupClusterConnection("cluster0", clusterAddress, 
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster0", clusterAddress, 
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 1, 0);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, address, queue, null, true, routingType);
+      addConsumer(0, 0, queue, null);
+      waitForBindings(0, address, 1, 1, true);
+
+      Thread.sleep(3000);

Review comment:
       I don't think we need this sleep b/c we are waiting on the binding 
propagation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 684651)
    Time Spent: 1h  (was: 50m)

> ARTEMIS-1925 fix does not handle redistribution to "old" consumers
> ------------------------------------------------------------------
>
>                 Key: ARTEMIS-3557
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3557
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Anton Roskvist
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> OFF_WITH_REDISTRIBUTION does not handle this scenario:
> If a destination and consumer exist on one node in a cluster and a producer 
> shows up on another node messages will not get redistributed until the old 
> consumer disconnects and reconnects.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to