gtully commented on a change in pull request #3858:
URL: https://github.com/apache/activemq-artemis/pull/3858#discussion_r754136911



##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
##########
@@ -315,6 +315,37 @@ public void onNotification(final Notification 
notification) {
 
                queueInfos.put(clusterName, info);
 
+               if (distance < 1) {
+                  //Binding added locally. If a matching remote binding with 
consumers exist, add a redistributor
+                  Binding binding = getBinding(routingName);
+
+                  if (binding != null) {
+                     try {
+                        Bindings bindings = getBindingsForAddress(address);
+
+                        for (Binding bind : bindings.getBindings()) {
+                           if (bind.isConnected() && bind instanceof 
RemoteQueueBinding) {
+
+                              RemoteQueueBinding remoteBinding = 
(RemoteQueueBinding) bind;
+
+                              if (remoteBinding.consumerCount() > 0) {
+
+                                 Queue queue = (Queue) binding.getBindable();
+                                 AddressSettings addressSettings = 
addressSettingsRepository.getMatch(binding.getAddress().toString());
+                                 long redistributionDelay = 
addressSettings.getRedistributionDelay();
+
+                                 if (redistributionDelay != -1) {
+                                    
queue.addRedistributor(redistributionDelay);

Review comment:
       hmm, I think we need a redistribution delay here, if delay is 0 we risk 
moving messages between binding creation and consumer creation, which is the 
normal pattern for a JMS consumer.

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
##########
@@ -723,7 +723,7 @@ protected void sendInRange(final int node,
                               final int msgEnd,
                               final boolean durable,
                               final String filterVal) throws Exception {
-      sendInRange(node, address, msgStart, msgEnd, durable, filterVal, null);

Review comment:
       these changes don't seem to be necessary for the "redistribute to old 
bindings" use case?

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
##########
@@ -794,6 +794,46 @@ public void 
testRedistributionOnlyWhenLocalRemovedLbOffWithRedistribution() thro
       verifyReceiveAll(2, 1);
    }
 
+   @Test
+   public void 
testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistribution() throws 
Exception {
+
+      String address = "test.address";
+      String queue = "test.address";
+      String clusterAddress = "test";
+      AddressSettings settings = new 
AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true);

Review comment:
       To have this test work, I needed to add a > 0 setRedistributionDelay, 
otherwise the assert on 831 - waiting for some message count on 1 would fail 
b/c the messages were already moved/redistributed.

##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
##########
@@ -315,6 +315,37 @@ public void onNotification(final Notification 
notification) {
 
                queueInfos.put(clusterName, info);
 
+               if (distance < 1) {
+                  //Binding added locally. If a matching remote binding with 
consumers exist, add a redistributor
+                  Binding binding = getBinding(routingName);
+
+                  if (binding != null) {
+                     try {
+                        Bindings bindings = getBindingsForAddress(address);
+
+                        for (Binding bind : bindings.getBindings()) {
+                           if (bind.isConnected() && bind instanceof 
RemoteQueueBinding) {
+
+                              RemoteQueueBinding remoteBinding = 
(RemoteQueueBinding) bind;
+
+                              if (remoteBinding.consumerCount() > 0) {
+
+                                 Queue queue = (Queue) binding.getBindable();
+                                 AddressSettings addressSettings = 
addressSettingsRepository.getMatch(binding.getAddress().toString());
+                                 long redistributionDelay = 
addressSettings.getRedistributionDelay();
+
+                                 if (redistributionDelay != -1) {

Review comment:
       I wonder if we want to break earlier if this condition is not met rather 
than checking each remoteBinding consumer count etc. maybe check the 
addressSettings at line:319

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
##########
@@ -794,6 +794,46 @@ public void 
testRedistributionOnlyWhenLocalRemovedLbOffWithRedistribution() thro
       verifyReceiveAll(2, 1);
    }
 
+   @Test
+   public void 
testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistribution() throws 
Exception {
+
+      String address = "test.address";
+      String queue = "test.address";
+      String clusterAddress = "test";
+      AddressSettings settings = new 
AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true);
+      RoutingType routingType = RoutingType.ANYCAST;
+
+      getServer(0).getAddressSettingsRepository().addMatch(address, settings);
+      getServer(1).getAddressSettingsRepository().addMatch(address, settings);
+
+      setupClusterConnection("cluster0", clusterAddress, 
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster0", clusterAddress, 
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 1, 0);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, address, queue, null, true, routingType);
+      addConsumer(0, 0, queue, null);
+      waitForBindings(0, address, 1, 1, true);
+
+      Thread.sleep(3000);

Review comment:
       I don't think we need this sleep b/c we are waiting on the binding 
propagation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to