gtully commented on a change in pull request #3858:
URL: https://github.com/apache/activemq-artemis/pull/3858#discussion_r754136911
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
##########
@@ -315,6 +315,37 @@ public void onNotification(final Notification
notification) {
queueInfos.put(clusterName, info);
+ if (distance < 1) {
+ //Binding added locally. If a matching remote binding with
consumers exist, add a redistributor
+ Binding binding = getBinding(routingName);
+
+ if (binding != null) {
+ try {
+ Bindings bindings = getBindingsForAddress(address);
+
+ for (Binding bind : bindings.getBindings()) {
+ if (bind.isConnected() && bind instanceof
RemoteQueueBinding) {
+
+ RemoteQueueBinding remoteBinding =
(RemoteQueueBinding) bind;
+
+ if (remoteBinding.consumerCount() > 0) {
+
+ Queue queue = (Queue) binding.getBindable();
+ AddressSettings addressSettings =
addressSettingsRepository.getMatch(binding.getAddress().toString());
+ long redistributionDelay =
addressSettings.getRedistributionDelay();
+
+ if (redistributionDelay != -1) {
+
queue.addRedistributor(redistributionDelay);
Review comment:
hmm, I think we need a redistribution delay here, if delay is 0 we risk
moving messages between binding creation and consumer creation, which is the
normal pattern for a JMS consumer.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
##########
@@ -723,7 +723,7 @@ protected void sendInRange(final int node,
final int msgEnd,
final boolean durable,
final String filterVal) throws Exception {
- sendInRange(node, address, msgStart, msgEnd, durable, filterVal, null);
Review comment:
these changes don't seem to be necessary for the "redistribute to old
bindings" use case?
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
##########
@@ -794,6 +794,46 @@ public void
testRedistributionOnlyWhenLocalRemovedLbOffWithRedistribution() thro
verifyReceiveAll(2, 1);
}
+ @Test
+ public void
testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistribution() throws
Exception {
+
+ String address = "test.address";
+ String queue = "test.address";
+ String clusterAddress = "test";
+ AddressSettings settings = new
AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true);
Review comment:
To have this test work, I needed to add a > 0 setRedistributionDelay,
otherwise the assert on 831 - waiting for some message count on 1 would fail
b/c the messages were already moved/redistributed.
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
##########
@@ -315,6 +315,37 @@ public void onNotification(final Notification
notification) {
queueInfos.put(clusterName, info);
+ if (distance < 1) {
+ //Binding added locally. If a matching remote binding with
consumers exist, add a redistributor
+ Binding binding = getBinding(routingName);
+
+ if (binding != null) {
+ try {
+ Bindings bindings = getBindingsForAddress(address);
+
+ for (Binding bind : bindings.getBindings()) {
+ if (bind.isConnected() && bind instanceof
RemoteQueueBinding) {
+
+ RemoteQueueBinding remoteBinding =
(RemoteQueueBinding) bind;
+
+ if (remoteBinding.consumerCount() > 0) {
+
+ Queue queue = (Queue) binding.getBindable();
+ AddressSettings addressSettings =
addressSettingsRepository.getMatch(binding.getAddress().toString());
+ long redistributionDelay =
addressSettings.getRedistributionDelay();
+
+ if (redistributionDelay != -1) {
Review comment:
I wonder if we want to break earlier if this condition is not met rather
than checking each remoteBinding consumer count etc. maybe check the
addressSettings at line:319
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
##########
@@ -794,6 +794,46 @@ public void
testRedistributionOnlyWhenLocalRemovedLbOffWithRedistribution() thro
verifyReceiveAll(2, 1);
}
+ @Test
+ public void
testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistribution() throws
Exception {
+
+ String address = "test.address";
+ String queue = "test.address";
+ String clusterAddress = "test";
+ AddressSettings settings = new
AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true);
+ RoutingType routingType = RoutingType.ANYCAST;
+
+ getServer(0).getAddressSettingsRepository().addMatch(address, settings);
+ getServer(1).getAddressSettingsRepository().addMatch(address, settings);
+
+ setupClusterConnection("cluster0", clusterAddress,
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1);
+ setupClusterConnection("cluster0", clusterAddress,
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 1, 0);
+
+ startServers(0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, address, queue, null, true, routingType);
+ addConsumer(0, 0, queue, null);
+ waitForBindings(0, address, 1, 1, true);
+
+ Thread.sleep(3000);
Review comment:
I don't think we need this sleep b/c we are waiting on the binding
propagation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]