brusdev commented on a change in pull request #3863: URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r759323382
########## File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java ########## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.balancing; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ResourceAllocationException; +import javax.jms.Session; +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; +import javax.security.auth.Subject; +import java.io.File; +import java.net.URI; +import java.util.Set; +import java.util.Stack; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.AddressControl; +import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl; +import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.security.CheckType; +import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; +import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5; +import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal; +import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal; +import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsConnectionListener; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.junit.Assert; +import org.junit.Test; + +public class ElasticQueueTest extends ActiveMQTestBase { + + static final String qName = "EQ"; + static final SimpleString qNameSimple = SimpleString.toSimpleString(qName); + + final int base_port = 61616; + final int opTimeoutMillis = 2000; + final Stack<EmbeddedActiveMQ> nodes = new Stack<>(); + private final String balancerConfigName = "role_name_sharder"; + + String urlForNodes(Stack<EmbeddedActiveMQ> nodes) { + StringBuilder builder = new StringBuilder("failover:("); + int port_start = base_port; + for (EmbeddedActiveMQ activeMQ : nodes) { + if (port_start != base_port) { + builder.append(","); + } + builder.append("amqp://localhost:" + (port_start++)); + } + // fast reconnect, randomize to get to all brokers and timeout sends that block on no credit + builder.append(")?failover.maxReconnectDelay=2000&failover.randomize=true&jms.sendTimeout=" + opTimeoutMillis); + return builder.toString(); + } + + // allow tracking of failover reconnects + class ConnectionListener implements JmsConnectionListener { + + AtomicInteger connectionCount; + + ConnectionListener(AtomicInteger connectionCount) { + this.connectionCount = connectionCount; + } + + @Override + public void onConnectionEstablished(URI uri) { + } + + @Override + public void onConnectionFailure(Throwable throwable) { + } + + @Override + public void onConnectionInterrupted(URI uri) { + } + + @Override + public void onConnectionRestored(URI uri) { + connectionCount.incrementAndGet(); + } + + @Override + public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) { + } + + @Override + public void onSessionClosed(Session session, Throwable throwable) { + } + + @Override + public void onConsumerClosed(MessageConsumer messageConsumer, Throwable throwable) { + } + + @Override + public void onProducerClosed(MessageProducer messageProducer, Throwable throwable) { + } + } + + // slow consumer + class EQConsumer extends Thread { + + final AtomicInteger consumedCount = new AtomicInteger(); + final AtomicInteger connectionCount = new AtomicInteger(); + final AtomicBoolean done = new AtomicBoolean(); + private final String url; + private final int delayMillis; + long lastConsumed = 0; + + EQConsumer(String url) { + this(url, 1000); + } + + EQConsumer(String url, int delay) { + this.url = url; + this.delayMillis = delay; + } + + @Override + public void run() { + + while (!done.get()) { + JmsConnectionFactory factory = new JmsConnectionFactory("CONSUMER", "PASSWORD", url); + + try (JmsConnection connection = (JmsConnection) factory.createConnection()) { + + // track disconnects via faiover listener + connectionCount.incrementAndGet(); + connection.addConnectionListener(new ConnectionListener(connectionCount)); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer(session.createQueue(qName)); + + while (!done.get()) { + Message receivedMessage = messageConsumer.receiveNoWait(); + if (receivedMessage != null) { + consumedCount.incrementAndGet(); + lastConsumed = receivedMessage.getLongProperty("PID"); + } else { + TimeUnit.MILLISECONDS.sleep(delayMillis); + } + } + } catch (JMSException | InterruptedException e) { + } + } + } + + public long getLastConsumed() { + return lastConsumed; + } + } + + // regular producer + class EQProducer extends Thread { + + final AtomicInteger producedCount = new AtomicInteger(); + final AtomicInteger connectionCount = new AtomicInteger(); + final AtomicBoolean done = new AtomicBoolean(); + private final String url; + + EQProducer(String url) { + this.url = url; + } + + @Override + public void run() { + + while (!done.get()) { + JmsConnectionFactory factory = new JmsConnectionFactory("PRODUCER", "PASSWORD", url); + + try (JmsConnection connection = (JmsConnection) factory.createConnection()) { + + // track disconnects via faiover listener + connectionCount.incrementAndGet(); + connection.addConnectionListener(new ConnectionListener(connectionCount)); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer messageProducer = session.createProducer(session.createQueue(qName)); + + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[1024]); + while (!done.get()) { + message.setLongProperty("PID", producedCount.get() + 1); + messageProducer.send(message); + producedCount.incrementAndGet(); + } + } catch (ResourceAllocationException expected) { + } catch (JMSException e) { + } + } + } + + public long getLastProduced() { + return producedCount.get(); + } + } + + // combined producer/ async consumer + class EQProducerAsyncConsumer extends Thread { + + final AtomicInteger producedCount = new AtomicInteger(); + final AtomicInteger connectionCount = new AtomicInteger(); + final AtomicBoolean done = new AtomicBoolean(); + final AtomicBoolean producerDone = new AtomicBoolean(); + private final String url; + final AtomicInteger consumedCount = new AtomicInteger(); + private final String user; + private long lastConsumed; + + EQProducerAsyncConsumer(String url, String user) { + this.url = url; + this.user = user; + } + + @Override + public void run() { + + while (!done.get()) { + JmsConnectionFactory factory = new JmsConnectionFactory(user, "PASSWORD", url); + + try (JmsConnection connection = (JmsConnection) factory.createConnection()) { + + // track disconnects via faiover listener + connectionCount.incrementAndGet(); + connection.addConnectionListener(new ConnectionListener(connectionCount)); + connection.start(); + + Session clientSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer messageConsumer = clientSession.createConsumer(clientSession.createQueue(qName)); + // consume async + messageConsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + consumedCount.incrementAndGet(); + try { + lastConsumed = message.getLongProperty("PID"); + if (!producerDone.get()) { + TimeUnit.SECONDS.sleep(1); + } + message.acknowledge(); + } catch (JMSException | InterruptedException ignored) { + } + } + }); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer messageProducer = session.createProducer(session.createQueue(qName)); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[1024]); + while (!done.get()) { + if (!producerDone.get()) { + message.setLongProperty("PID", producedCount.get() + 1); + messageProducer.send(message); + producedCount.incrementAndGet(); + } else { + // just hang about and let the consumer listener work + TimeUnit.SECONDS.sleep(5); + } + } + } catch (JMSException | InterruptedException ignored) { + } + } + } + + public long getLastProduced() { + return producedCount.get(); + } + + public long getLastConsumed() { + return lastConsumed; + } + } + + MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer(); + + // hardwire authenticaton to map USER to EQ_USER etc + final ActiveMQSecurityManager5 customSecurityManager = new ActiveMQSecurityManager5() { + @Override + public Subject authenticate(String user, + String password, + RemotingConnection remotingConnection, + String securityDomain) { + Subject subject = null; + if (validateUser(user, password)) { + subject = new Subject(); + subject.getPrincipals().add(new UserPrincipal(user)); + subject.getPrincipals().add(new RolePrincipal("EQ_" + user)); + if (user.equals("BOTH")) { + subject.getPrincipals().add(new RolePrincipal("EQ_PRODUCER")); + subject.getPrincipals().add(new RolePrincipal("EQ_CONSUMER")); + } + } + return subject; + } + + @Override + public boolean authorize(Subject subject, Set<Role> roles, CheckType checkType, String address) { + return true; + } + + @Override + public boolean validateUser(final String username, final String password) { + return (username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH")); + } + + @Override + public boolean validateUserAndRole(final String username, + final String password, + final Set<Role> requiredRoles, + final CheckType checkType) { + if (username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH")) { + return true; + } + return false; + } + }; + + final ObjectNameBuilder node0NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node0", true); + final ObjectNameBuilder node1NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node1", true); + + + /* + use case is dispatch from memory, with non-blocking producers + producers add to the head of the broker chain, consumers receive from the tail + when head == tail we are back to one broker for that address, the end of the chain + */ + private void prepareNodesAndStartCombinedHeadTail() throws Exception { + prepareNodesAndStartCombinedHeadTail(1000, 300); + } + + private void prepareNodesAndStartCombinedHeadTail(int credit, int creditMin) throws Exception { + Assert.assertTrue(credit > 0); + + AddressSettings blockingQueue = new AddressSettings(); + blockingQueue.setMaxSizeBytes(100 * 1024) + .setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL) + .setSlowConsumerPolicy(SlowConsumerPolicy.KILL).setSlowConsumerThreshold(0).setSlowConsumerCheckPeriod(TimeUnit.MILLISECONDS.toSeconds(opTimeoutMillis)) + .setAutoDeleteQueues(false).setAutoDeleteAddresses(false); // so slow consumer can kick in! + + Configuration baseConfig = new ConfigurationImpl(); + baseConfig.getAddressesSettings().put(qName, blockingQueue); + + BrokerBalancerConfiguration balancerConfiguration = new BrokerBalancerConfiguration(); + balancerConfiguration.setName(balancerConfigName).setTargetKey(TargetKey.ROLE_NAME).setTargetKeyFilter("(?<=^EQ_).*"); // strip EQ_ prefix + baseConfig.addBalancerConfiguration(balancerConfiguration); + + // prepare two nodes + for (int nodeId = 0; nodeId < 2; nodeId++) { + Configuration configuration = baseConfig.copy(); + configuration.setName("Node" + nodeId); + configuration.setBrokerInstance(new File(getTestDirfile(), configuration.getName())); + configuration.addAcceptorConfiguration("tcp", "tcp://localhost:" + (base_port + (nodeId)) + "?redirect-to=" + balancerConfigName + ";amqpCredits=" + credit + ";amqpMinCredits=" + creditMin); + nodes.add(new EmbeddedActiveMQ().setConfiguration(configuration)); + nodes.get(nodeId).setSecurityManager(customSecurityManager); + nodes.get(nodeId).setMbeanServer(mBeanServer); + } + + // node0 initially handles both producer & consumer (head & tail) + nodes.get(0).getConfiguration().getBalancerConfigurations().get(0).setLocalTargetFilter("PRODUCER|CONSUMER"); + nodes.get(0).start(); + } + + @Test (timeout = 20000) + public void testScale0_1() throws Exception { + + prepareNodesAndStartCombinedHeadTail(); + + // slow consumer, delay on each message received + EQConsumer eqConsumer = new EQConsumer(urlForNodes(nodes)); + eqConsumer.start(); + + // verify consumer reconnects on no messages + assertTrue(Wait.waitFor(() -> eqConsumer.connectionCount.get() > 1)); + + EQProducer eqProducer = new EQProducer(urlForNodes(nodes)); + eqProducer.start(); + + // verify producer reconnects on fail full! + assertTrue(Wait.waitFor(() -> eqProducer.connectionCount.get() > 1)); Review comment: why not using Wait.assertTrue ? ########## File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java ########## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.balancing; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ResourceAllocationException; +import javax.jms.Session; +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; +import javax.security.auth.Subject; +import java.io.File; +import java.net.URI; +import java.util.Set; +import java.util.Stack; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.AddressControl; +import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl; +import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.security.CheckType; +import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; +import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5; +import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal; +import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal; +import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsConnectionListener; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.junit.Assert; +import org.junit.Test; + +public class ElasticQueueTest extends ActiveMQTestBase { + + static final String qName = "EQ"; + static final SimpleString qNameSimple = SimpleString.toSimpleString(qName); + + final int base_port = 61616; + final int opTimeoutMillis = 2000; + final Stack<EmbeddedActiveMQ> nodes = new Stack<>(); + private final String balancerConfigName = "role_name_sharder"; + + String urlForNodes(Stack<EmbeddedActiveMQ> nodes) { + StringBuilder builder = new StringBuilder("failover:("); + int port_start = base_port; + for (EmbeddedActiveMQ activeMQ : nodes) { + if (port_start != base_port) { + builder.append(","); + } + builder.append("amqp://localhost:" + (port_start++)); + } + // fast reconnect, randomize to get to all brokers and timeout sends that block on no credit + builder.append(")?failover.maxReconnectDelay=2000&failover.randomize=true&jms.sendTimeout=" + opTimeoutMillis); + return builder.toString(); + } + + // allow tracking of failover reconnects + class ConnectionListener implements JmsConnectionListener { + + AtomicInteger connectionCount; + + ConnectionListener(AtomicInteger connectionCount) { + this.connectionCount = connectionCount; + } + + @Override + public void onConnectionEstablished(URI uri) { + } + + @Override + public void onConnectionFailure(Throwable throwable) { + } + + @Override + public void onConnectionInterrupted(URI uri) { + } + + @Override + public void onConnectionRestored(URI uri) { + connectionCount.incrementAndGet(); + } + + @Override + public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) { + } + + @Override + public void onSessionClosed(Session session, Throwable throwable) { + } + + @Override + public void onConsumerClosed(MessageConsumer messageConsumer, Throwable throwable) { + } + + @Override + public void onProducerClosed(MessageProducer messageProducer, Throwable throwable) { + } + } + + // slow consumer + class EQConsumer extends Thread { + + final AtomicInteger consumedCount = new AtomicInteger(); + final AtomicInteger connectionCount = new AtomicInteger(); + final AtomicBoolean done = new AtomicBoolean(); + private final String url; + private final int delayMillis; + long lastConsumed = 0; + + EQConsumer(String url) { + this(url, 1000); + } + + EQConsumer(String url, int delay) { + this.url = url; + this.delayMillis = delay; + } + + @Override + public void run() { + + while (!done.get()) { + JmsConnectionFactory factory = new JmsConnectionFactory("CONSUMER", "PASSWORD", url); + + try (JmsConnection connection = (JmsConnection) factory.createConnection()) { + + // track disconnects via faiover listener + connectionCount.incrementAndGet(); + connection.addConnectionListener(new ConnectionListener(connectionCount)); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer(session.createQueue(qName)); + + while (!done.get()) { + Message receivedMessage = messageConsumer.receiveNoWait(); + if (receivedMessage != null) { + consumedCount.incrementAndGet(); + lastConsumed = receivedMessage.getLongProperty("PID"); + } else { + TimeUnit.MILLISECONDS.sleep(delayMillis); + } + } + } catch (JMSException | InterruptedException e) { + } + } + } + + public long getLastConsumed() { + return lastConsumed; + } + } + + // regular producer + class EQProducer extends Thread { + + final AtomicInteger producedCount = new AtomicInteger(); + final AtomicInteger connectionCount = new AtomicInteger(); + final AtomicBoolean done = new AtomicBoolean(); + private final String url; + + EQProducer(String url) { + this.url = url; + } + + @Override + public void run() { + + while (!done.get()) { + JmsConnectionFactory factory = new JmsConnectionFactory("PRODUCER", "PASSWORD", url); + + try (JmsConnection connection = (JmsConnection) factory.createConnection()) { + + // track disconnects via faiover listener + connectionCount.incrementAndGet(); + connection.addConnectionListener(new ConnectionListener(connectionCount)); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer messageProducer = session.createProducer(session.createQueue(qName)); + + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[1024]); + while (!done.get()) { + message.setLongProperty("PID", producedCount.get() + 1); + messageProducer.send(message); + producedCount.incrementAndGet(); + } + } catch (ResourceAllocationException expected) { + } catch (JMSException e) { + } + } + } + + public long getLastProduced() { + return producedCount.get(); + } + } + + // combined producer/ async consumer + class EQProducerAsyncConsumer extends Thread { + + final AtomicInteger producedCount = new AtomicInteger(); + final AtomicInteger connectionCount = new AtomicInteger(); + final AtomicBoolean done = new AtomicBoolean(); + final AtomicBoolean producerDone = new AtomicBoolean(); + private final String url; + final AtomicInteger consumedCount = new AtomicInteger(); + private final String user; + private long lastConsumed; + + EQProducerAsyncConsumer(String url, String user) { + this.url = url; + this.user = user; + } + + @Override + public void run() { + + while (!done.get()) { + JmsConnectionFactory factory = new JmsConnectionFactory(user, "PASSWORD", url); + + try (JmsConnection connection = (JmsConnection) factory.createConnection()) { + + // track disconnects via faiover listener + connectionCount.incrementAndGet(); + connection.addConnectionListener(new ConnectionListener(connectionCount)); + connection.start(); + + Session clientSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer messageConsumer = clientSession.createConsumer(clientSession.createQueue(qName)); + // consume async + messageConsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + consumedCount.incrementAndGet(); + try { + lastConsumed = message.getLongProperty("PID"); + if (!producerDone.get()) { + TimeUnit.SECONDS.sleep(1); + } + message.acknowledge(); + } catch (JMSException | InterruptedException ignored) { + } + } + }); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer messageProducer = session.createProducer(session.createQueue(qName)); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[1024]); + while (!done.get()) { + if (!producerDone.get()) { + message.setLongProperty("PID", producedCount.get() + 1); + messageProducer.send(message); + producedCount.incrementAndGet(); + } else { + // just hang about and let the consumer listener work + TimeUnit.SECONDS.sleep(5); + } + } + } catch (JMSException | InterruptedException ignored) { + } + } + } + + public long getLastProduced() { + return producedCount.get(); + } + + public long getLastConsumed() { + return lastConsumed; + } + } + + MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer(); + + // hardwire authenticaton to map USER to EQ_USER etc + final ActiveMQSecurityManager5 customSecurityManager = new ActiveMQSecurityManager5() { + @Override + public Subject authenticate(String user, + String password, + RemotingConnection remotingConnection, + String securityDomain) { + Subject subject = null; + if (validateUser(user, password)) { + subject = new Subject(); + subject.getPrincipals().add(new UserPrincipal(user)); + subject.getPrincipals().add(new RolePrincipal("EQ_" + user)); + if (user.equals("BOTH")) { + subject.getPrincipals().add(new RolePrincipal("EQ_PRODUCER")); + subject.getPrincipals().add(new RolePrincipal("EQ_CONSUMER")); + } + } + return subject; + } + + @Override + public boolean authorize(Subject subject, Set<Role> roles, CheckType checkType, String address) { + return true; + } + + @Override + public boolean validateUser(final String username, final String password) { + return (username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH")); + } + + @Override + public boolean validateUserAndRole(final String username, + final String password, + final Set<Role> requiredRoles, + final CheckType checkType) { + if (username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH")) { + return true; + } + return false; + } + }; + + final ObjectNameBuilder node0NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node0", true); + final ObjectNameBuilder node1NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node1", true); + + + /* + use case is dispatch from memory, with non-blocking producers + producers add to the head of the broker chain, consumers receive from the tail + when head == tail we are back to one broker for that address, the end of the chain + */ + private void prepareNodesAndStartCombinedHeadTail() throws Exception { + prepareNodesAndStartCombinedHeadTail(1000, 300); + } + + private void prepareNodesAndStartCombinedHeadTail(int credit, int creditMin) throws Exception { + Assert.assertTrue(credit > 0); + + AddressSettings blockingQueue = new AddressSettings(); + blockingQueue.setMaxSizeBytes(100 * 1024) + .setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL) + .setSlowConsumerPolicy(SlowConsumerPolicy.KILL).setSlowConsumerThreshold(0).setSlowConsumerCheckPeriod(TimeUnit.MILLISECONDS.toSeconds(opTimeoutMillis)) + .setAutoDeleteQueues(false).setAutoDeleteAddresses(false); // so slow consumer can kick in! + + Configuration baseConfig = new ConfigurationImpl(); + baseConfig.getAddressesSettings().put(qName, blockingQueue); + + BrokerBalancerConfiguration balancerConfiguration = new BrokerBalancerConfiguration(); + balancerConfiguration.setName(balancerConfigName).setTargetKey(TargetKey.ROLE_NAME).setTargetKeyFilter("(?<=^EQ_).*"); // strip EQ_ prefix + baseConfig.addBalancerConfiguration(balancerConfiguration); + + // prepare two nodes + for (int nodeId = 0; nodeId < 2; nodeId++) { + Configuration configuration = baseConfig.copy(); + configuration.setName("Node" + nodeId); + configuration.setBrokerInstance(new File(getTestDirfile(), configuration.getName())); + configuration.addAcceptorConfiguration("tcp", "tcp://localhost:" + (base_port + (nodeId)) + "?redirect-to=" + balancerConfigName + ";amqpCredits=" + credit + ";amqpMinCredits=" + creditMin); + nodes.add(new EmbeddedActiveMQ().setConfiguration(configuration)); + nodes.get(nodeId).setSecurityManager(customSecurityManager); + nodes.get(nodeId).setMbeanServer(mBeanServer); + } + + // node0 initially handles both producer & consumer (head & tail) + nodes.get(0).getConfiguration().getBalancerConfigurations().get(0).setLocalTargetFilter("PRODUCER|CONSUMER"); + nodes.get(0).start(); + } + + @Test (timeout = 20000) + public void testScale0_1() throws Exception { + + prepareNodesAndStartCombinedHeadTail(); + + // slow consumer, delay on each message received + EQConsumer eqConsumer = new EQConsumer(urlForNodes(nodes)); + eqConsumer.start(); + + // verify consumer reconnects on no messages + assertTrue(Wait.waitFor(() -> eqConsumer.connectionCount.get() > 1)); + + EQProducer eqProducer = new EQProducer(urlForNodes(nodes)); + eqProducer.start(); + + // verify producer reconnects on fail full! + assertTrue(Wait.waitFor(() -> eqProducer.connectionCount.get() > 1)); + + // operator mode, poll queue control - to allow producer to continue, activate next broker in the 'chain' + AddressControl addressControl0 = (AddressControl) ManagementControlHelper.createProxy(node0NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer); + + AddressControl finalAddressControl = addressControl0; + assertTrue(Wait.waitFor(() -> { + int usage = finalAddressControl.getAddressLimitPercent(); + System.out.println("Node0 usage % " + usage); + return usage == 100; Review comment: I guess usage percent could never be 100 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
