[
https://issues.apache.org/jira/browse/ARTEMIS-3594?focusedWorklogId=691737&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-691737
]
ASF GitHub Bot logged work on ARTEMIS-3594:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Dec/21 13:16
Start Date: 07/Dec/21 13:16
Worklog Time Spent: 10m
Work Description: gtully commented on a change in pull request #3867:
URL: https://github.com/apache/activemq-artemis/pull/3867#discussion_r763981312
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AutoClientIDShardClusterTest.java
##########
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.balancing;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import
org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import
org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import
org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import
org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
+import
org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import
org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class AutoClientIDShardClusterTest extends BalancingTestBase {
+
+ @Parameterized.Parameters(name = "protocol: {0}")
+ public static Collection<Object[]> data() {
+ final String[] protocols = new String[] {AMQP_PROTOCOL, CORE_PROTOCOL,
OPENWIRE_PROTOCOL};
+ Collection<Object[]> data = new ArrayList<>();
+ for (String protocol : protocols) {
+ data.add(new Object[] {protocol});
+ }
+ return data;
+ }
+
+ private final String protocol;
+ final int numMessages = 50;
+ AtomicInteger toSend = new AtomicInteger(numMessages);
+
+ public AutoClientIDShardClusterTest(String protocol) {
+ this.protocol = protocol;
+ }
+
+ protected void setupServers() throws Exception {
+ for (int i = 0; i < 2; i++) {
+ setupLiveServer(i, true, HAType.SharedNothingReplication, true,
false);
+ servers[i].addProtocolManagerFactory(new
ProtonProtocolManagerFactory());
+ servers[i].addProtocolManagerFactory(new
OpenWireProtocolManagerFactory());
+ }
+ setupClusterConnection("cluster0", "T",
MessageLoadBalancingType.ON_DEMAND, 1,true, 0, 1);
+ setupClusterConnection("cluster1", "T",
MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 0);
+ toSend.set(numMessages);
+ }
+
+ Thread producer = new Thread(new Runnable() {
+ final AtomicInteger producerSeq = new AtomicInteger();
+
+ @Override
+ public void run() {
+ while (toSend.get() > 0) {
+ try {
+ ConnectionFactory connectionFactory = createFactory(protocol,
"producer", "admin", "admin");
+ try (Connection connection =
connectionFactory.createConnection()) {
+ connection.start();
+ try (Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE)) {
+ javax.jms.Topic topic = session.createTopic("T");
+ try (MessageProducer producer =
session.createProducer(topic)) {
+ for (int i = 0; i < 10 && toSend.get() > 0; i++) {
+ Message message = session.createTextMessage();
+ message.setIntProperty("SEQ", producerSeq.get() +
1);
+ producer.send(message);
+ producerSeq.incrementAndGet();
+ toSend.decrementAndGet();
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ }
+ }
+ } catch (Exception ok) {
+ }
+ }
+ }
+ });
+
+ class DurableSub implements Runnable {
+
+ final String id;
+ int receivedInOrder = -1;
+ int lastReceived;
+ int maxReceived;
+ AtomicBoolean consumerDone = new AtomicBoolean();
+ AtomicBoolean orderShot = new AtomicBoolean();
+ CountDownLatch registered = new CountDownLatch(1);
+
+ DurableSub(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public void run() {
+ while (!consumerDone.get()) {
+ try {
+ ConnectionFactory connectionFactory = createFactory(protocol,
"ClientId-" + id, "admin", "admin");
+ try (Connection connection =
connectionFactory.createConnection()) {
+ connection.start();
+ try (Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE)) {
+ javax.jms.Topic topic = session.createTopic("T");
+ try (TopicSubscriber durableSubscriber =
session.createDurableSubscriber(topic, "Sub-" + id)) {
+ registered.countDown();
+ for (int i = 0; i < 5; i++) {
+ Message message = durableSubscriber.receive(500);
+ if (message != null) {
+ lastReceived = message.getIntProperty("SEQ");
+ if (lastReceived > maxReceived) {
+ maxReceived = lastReceived;
+ }
+ if (receivedInOrder < 0) {
+ receivedInOrder = lastReceived;
+ } else if (receivedInOrder == lastReceived - 1) {
+ receivedInOrder++;
+ } else {
+ if (!orderShot.get()) {
+ System.err.println("Sub: " + id + ",
received: out of order " + lastReceived + ", last in order: " +
receivedInOrder);
+ }
+ orderShot.set(true);
+ }
+ } else {
+ // no point trying again if there is nothing for
us now.
+ break;
+ }
+ }
+ TimeUnit.MILLISECONDS.sleep(500);
+ }
+ }
+ }
+ } catch (Exception ok) {
+ }
+ }
+ }
+ }
+
+ @Ignore("not totally reliable, but does show the root cause of the problem
being solved")
+ public void testWithoutOutSharding() throws Exception {
+ setupServers();
+ startServers(0, 1);
+
+ // two bouncy durable consumers
+ DurableSub sub0 = new DurableSub("0");
+ DurableSub sub1 = new DurableSub("1");
+
+ new Thread(sub0).start();
+ new Thread(sub1).start();
+
+ // waiting for registration before production to give bridges a chance
+ assertTrue(sub0.registered.await(20, TimeUnit.SECONDS));
+ assertTrue(sub1.registered.await(20, TimeUnit.SECONDS));
+
+
+ // wait for bindings!
+ assertTrue(Wait.waitFor(() -> 2 == gatherBindingsOnT(0)[0], 20000,
2000));
+
+ assertTrue(Wait.waitFor(() -> 2 == gatherBindingsOnT(1)[0], 20000,
2000));
+
+
+ // wait for remote bindings!
+ assertTrue(Wait.waitFor(() -> 2 == gatherBindingsOnT(0)[1], 20000,
2000));
+
+ assertTrue(Wait.waitFor(() -> 2 == gatherBindingsOnT(1)[1], 20000,
2000));
+
+ // produce a few every second with failover randomize=true to Topic T
+ producer.start();
+
+
+ assertTrue("All sent", Wait.waitFor(() -> toSend.get() == 0));
+
+ assertTrue("All received sub0", Wait.waitFor(() -> sub0.maxReceived ==
numMessages));
+
+ assertTrue("All received sub1", Wait.waitFor(() -> sub1.maxReceived ==
numMessages));
+
+
+ // with bouncing, one 'may' be out of order, hence ignored
+ assertTrue(sub0.orderShot.get() || sub1.orderShot.get());
+
+ sub0.consumerDone.set(true);
+ sub1.consumerDone.set(true);
+
+ stopServers(0, 1);
+ }
+
+ @Test
+ public void testWithConsistentHashClientIDModTwo() throws Exception {
Review comment:
I want to keep them separate but similar b/c the testWithoutOutSharding
is non deterministic but informative of the potential problem when producers
and consumers bounce around a cluster.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 691737)
Time Spent: 2h (was: 1h 50m)
> Broker balancer - allow a key transformer for a local-target-filter
> -------------------------------------------------------------------
>
> Key: ARTEMIS-3594
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3594
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Components: balancer
> Affects Versions: 2.19.0
> Reporter: Gary Tully
> Assignee: Gary Tully
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
> Add support of a CONSISTENT_HASH_MODULO key transformer that can be used for
> jms durable subs in a cluster.
> Using a consistent hash on the clientID, that can be transformed into an int
> < numBrokers such that each value is mapped to a particular broker.
> In a simple 2 broker cluster, this avoids the need for a durable subscriber
> to know about its broker, the balancer will only allow it to connect to the
> right one.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)