[
https://issues.apache.org/jira/browse/ARTEMIS-3557?focusedWorklogId=684651&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-684651
]
ASF GitHub Bot logged work on ARTEMIS-3557:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/Nov/21 12:13
Start Date: 22/Nov/21 12:13
Worklog Time Spent: 10m
Work Description: gtully commented on a change in pull request #3858:
URL: https://github.com/apache/activemq-artemis/pull/3858#discussion_r754136911
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
##########
@@ -315,6 +315,37 @@ public void onNotification(final Notification
notification) {
queueInfos.put(clusterName, info);
+ if (distance < 1) {
+ //Binding added locally. If a matching remote binding with
consumers exist, add a redistributor
+ Binding binding = getBinding(routingName);
+
+ if (binding != null) {
+ try {
+ Bindings bindings = getBindingsForAddress(address);
+
+ for (Binding bind : bindings.getBindings()) {
+ if (bind.isConnected() && bind instanceof
RemoteQueueBinding) {
+
+ RemoteQueueBinding remoteBinding =
(RemoteQueueBinding) bind;
+
+ if (remoteBinding.consumerCount() > 0) {
+
+ Queue queue = (Queue) binding.getBindable();
+ AddressSettings addressSettings =
addressSettingsRepository.getMatch(binding.getAddress().toString());
+ long redistributionDelay =
addressSettings.getRedistributionDelay();
+
+ if (redistributionDelay != -1) {
+
queue.addRedistributor(redistributionDelay);
Review comment:
hmm, I think we need a redistribution delay here, if delay is 0 we risk
moving messages between binding creation and consumer creation, which is the
normal pattern for a JMS consumer.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
##########
@@ -723,7 +723,7 @@ protected void sendInRange(final int node,
final int msgEnd,
final boolean durable,
final String filterVal) throws Exception {
- sendInRange(node, address, msgStart, msgEnd, durable, filterVal, null);
Review comment:
these changes don't seem to be necessary for the "redistribute to old
bindings" use case?
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
##########
@@ -794,6 +794,46 @@ public void
testRedistributionOnlyWhenLocalRemovedLbOffWithRedistribution() thro
verifyReceiveAll(2, 1);
}
+ @Test
+ public void
testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistribution() throws
Exception {
+
+ String address = "test.address";
+ String queue = "test.address";
+ String clusterAddress = "test";
+ AddressSettings settings = new
AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true);
Review comment:
To have this test work, I needed to add a > 0 setRedistributionDelay,
otherwise the assert on 831 - waiting for some message count on 1 would fail
b/c the messages were already moved/redistributed.
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
##########
@@ -315,6 +315,37 @@ public void onNotification(final Notification
notification) {
queueInfos.put(clusterName, info);
+ if (distance < 1) {
+ //Binding added locally. If a matching remote binding with
consumers exist, add a redistributor
+ Binding binding = getBinding(routingName);
+
+ if (binding != null) {
+ try {
+ Bindings bindings = getBindingsForAddress(address);
+
+ for (Binding bind : bindings.getBindings()) {
+ if (bind.isConnected() && bind instanceof
RemoteQueueBinding) {
+
+ RemoteQueueBinding remoteBinding =
(RemoteQueueBinding) bind;
+
+ if (remoteBinding.consumerCount() > 0) {
+
+ Queue queue = (Queue) binding.getBindable();
+ AddressSettings addressSettings =
addressSettingsRepository.getMatch(binding.getAddress().toString());
+ long redistributionDelay =
addressSettings.getRedistributionDelay();
+
+ if (redistributionDelay != -1) {
Review comment:
I wonder if we want to break earlier if this condition is not met rather
than checking each remoteBinding consumer count etc. maybe check the
addressSettings at line:319
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
##########
@@ -794,6 +794,46 @@ public void
testRedistributionOnlyWhenLocalRemovedLbOffWithRedistribution() thro
verifyReceiveAll(2, 1);
}
+ @Test
+ public void
testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistribution() throws
Exception {
+
+ String address = "test.address";
+ String queue = "test.address";
+ String clusterAddress = "test";
+ AddressSettings settings = new
AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true);
+ RoutingType routingType = RoutingType.ANYCAST;
+
+ getServer(0).getAddressSettingsRepository().addMatch(address, settings);
+ getServer(1).getAddressSettingsRepository().addMatch(address, settings);
+
+ setupClusterConnection("cluster0", clusterAddress,
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1);
+ setupClusterConnection("cluster0", clusterAddress,
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 1, 0);
+
+ startServers(0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, address, queue, null, true, routingType);
+ addConsumer(0, 0, queue, null);
+ waitForBindings(0, address, 1, 1, true);
+
+ Thread.sleep(3000);
Review comment:
I don't think we need this sleep b/c we are waiting on the binding
propagation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 684651)
Time Spent: 1h (was: 50m)
> ARTEMIS-1925 fix does not handle redistribution to "old" consumers
> ------------------------------------------------------------------
>
> Key: ARTEMIS-3557
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3557
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Anton Roskvist
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> OFF_WITH_REDISTRIBUTION does not handle this scenario:
> If a destination and consumer exist on one node in a cluster and a producer
> shows up on another node messages will not get redistributed until the old
> consumer disconnects and reconnects.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)