http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java new file mode 100644 index 0000000..acfc0f6 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.XASession; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import org.apache.activemq.ActiveMQXAConnection; +import org.apache.activemq.ActiveMQXAConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.BrokerRestartTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.TransactionBroker; +import org.apache.activemq.broker.TransportConnection; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.transport.failover.FailoverTransport; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Test for AMQ-4950. + * Simulates an error during XA prepare call. + */ +public class AMQ4950Test extends BrokerRestartTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(AMQ4950Test.class); + protected static final String simulatedExceptionMessage = "Simulating error inside tx prepare()."; + public boolean prioritySupport = false; + protected String connectionUri = null; + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + broker.setDestinationPolicy(policyMap); + broker.setDeleteAllMessagesOnStartup(true); + broker.setUseJmx(false); + connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); + broker.setPlugins(new BrokerPlugin[]{ + new BrokerPluginSupport() { + + @Override + public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { + getNext().prepareTransaction(context, xid); + LOG.debug("BrokerPlugin.prepareTransaction() will throw an exception."); + throw new XAException(simulatedExceptionMessage); + } + + @Override + public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { + LOG.debug("BrokerPlugin.commitTransaction()."); + super.commitTransaction(context, xid, onePhase); + } + } + }); + } + + /** + * Creates XA transaction and invokes XA prepare(). + * Due to registered BrokerFilter prepare will be handled by broker + * but then throw an exception. + * Prior to fixing AMQ-4950, this resulted in a ClassCastException + * in ConnectionStateTracker.PrepareReadonlyTransactionAction.onResponse() + * causing the failover transport to reconnect and replay the XA prepare(). + */ + public void testXAPrepareFailure() throws Exception { + + assertNotNull(connectionUri); + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("failover:(" + connectionUri + ")"); + ActiveMQXAConnection xaConnection = (ActiveMQXAConnection)cf.createConnection(); + xaConnection.start(); + XASession session = xaConnection.createXASession(); + XAResource resource = session.getXAResource(); + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + + MessageProducer producer = session.createProducer(session.createQueue(this.getClass().getName())); + Message message = session.createTextMessage("Sample Message"); + producer.send(message); + resource.end(tid, XAResource.TMSUCCESS); + try { + LOG.debug("Calling XA prepare(), expecting an exception"); + int ret = resource.prepare(tid); + if (XAResource.XA_OK == ret) + resource.commit(tid, false); + } catch (XAException xae) { + LOG.info("Received excpected XAException: {}", xae.getMessage()); + LOG.info("Rolling back transaction {}", tid); + + // with bug AMQ-4950 the thrown error reads "Cannot call prepare now" + // we check that we receive the original exception message as + // thrown by the BrokerPlugin + assertEquals(simulatedExceptionMessage, xae.getMessage()); + resource.rollback(tid); + } + // couple of assertions + assertTransactionGoneFromBroker(tid); + assertTransactionGoneFromConnection(broker.getBrokerName(), xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid); + assertTransactionGoneFromFailoverState(xaConnection, tid); + + //cleanup + producer.close(); + session.close(); + xaConnection.close(); + LOG.debug("testXAPrepareFailure() finished."); + } + + + public Xid createXid() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeLong(++txGenerator); + os.close(); + final byte[] bs = baos.toByteArray(); + + return new Xid() { + public int getFormatId() { + return 86; + } + + public byte[] getGlobalTransactionId() { + return bs; + } + + public byte[] getBranchQualifier() { + return bs; + } + }; + } + + + private void assertTransactionGoneFromFailoverState( + ActiveMQXAConnection connection1, Xid tid) throws Exception { + + FailoverTransport transport = (FailoverTransport) connection1.getTransport().narrow(FailoverTransport.class); + TransactionInfo info = new TransactionInfo(connection1.getConnectionInfo().getConnectionId(), new XATransactionId(tid), TransactionInfo.COMMIT_ONE_PHASE); + assertNull("transaction should not exist in the state tracker", + transport.getStateTracker().processCommitTransactionOnePhase(info)); + } + + + private void assertTransactionGoneFromBroker(Xid tid) throws Exception { + BrokerService broker = BrokerRegistry.getInstance().lookup("localhost"); + TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class); + try { + transactionBroker.getTransaction(null, new XATransactionId(tid), false); + fail("expected exception on tx not found"); + } catch (XAException expectedOnNotFound) { + } + } + + + private void assertTransactionGoneFromConnection(String brokerName, String clientId, ConnectionId connectionId, Xid tid) throws Exception { + BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName); + CopyOnWriteArrayList<TransportConnection> connections = broker.getTransportConnectors().get(0).getConnections(); + for (TransportConnection connection: connections) { + if (connection.getConnectionId().equals(clientId)) { + try { + connection.processPrepareTransaction(new TransactionInfo(connectionId, new XATransactionId(tid), TransactionInfo.PREPARE)); + fail("did not get expected excepton on missing transaction, it must be still there in error!"); + } catch (IllegalStateException expectedOnNoTransaction) { + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java new file mode 100644 index 0000000..6a52e46 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java @@ -0,0 +1,505 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.net.URI; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.sql.DataSource; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.Wait; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; + +/** + * Test creates a broker network with two brokers - producerBroker (with a + * message producer attached) and consumerBroker (with consumer attached) + * <p/> + * Simulates network duplicate message by stopping and restarting the + * consumerBroker after message (with message ID ending in 120) is persisted to + * consumerBrokerstore BUT BEFORE ack sent to the producerBroker over the + * network connection. When the network connection is reestablished the + * producerBroker resends message (with messageID ending in 120). + * <p/> + * Expectation: + * <p/> + * With the following policy entries set, would expect the duplicate message to + * be read from the store and dispatched to the consumer - where the duplicate + * could be detected by consumer. + * <p/> + * PolicyEntry policy = new PolicyEntry(); policy.setQueue(">"); + * policy.setEnableAudit(false); policy.setUseCache(false); + * policy.setExpireMessagesPeriod(0); + * <p/> + * <p/> + * Note 1: Network needs to use replaywhenNoConsumers so enabling the + * networkAudit to avoid this scenario is not feasible. + * <p/> + * NOTE 2: Added a custom plugin to the consumerBroker so that the + * consumerBroker shutdown will occur after a message has been persisted to + * consumerBroker store but before an ACK is sent back to ProducerBroker. This + * is just a hack to ensure producerBroker will resend the message after + * shutdown. + */ + +@RunWith(value = Parameterized.class) +public class AMQ4952Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ4952Test.class); + + protected static final int MESSAGE_COUNT = 1; + + protected BrokerService consumerBroker; + protected BrokerService producerBroker; + + protected ActiveMQQueue QUEUE_NAME = new ActiveMQQueue("duptest.store"); + + private final CountDownLatch stopConsumerBroker = new CountDownLatch(1); + private final CountDownLatch consumerBrokerRestarted = new CountDownLatch(1); + private final CountDownLatch consumerRestartedAndMessageForwarded = new CountDownLatch(1); + + private EmbeddedDataSource localDataSource; + + @Parameterized.Parameter(0) + public boolean enableCursorAudit; + + @Parameterized.Parameters(name = "enableAudit={0}") + public static Iterable<Object[]> getTestParameters() { + return Arrays.asList(new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }); + } + + @Test + public void testConsumerBrokerRestart() throws Exception { + + Callable consumeMessageTask = new Callable() { + @Override + public Object call() throws Exception { + + int receivedMessageCount = 0; + + ActiveMQConnectionFactory consumerFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:2006)?randomize=false&backup=false"); + Connection consumerConnection = consumerFactory.createConnection(); + + try { + + consumerConnection.setClientID("consumer"); + consumerConnection.start(); + + Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer messageConsumer = consumerSession.createConsumer(QUEUE_NAME); + + while (true) { + TextMessage textMsg = (TextMessage) messageConsumer.receive(5000); + + if (textMsg == null) { + return receivedMessageCount; + } + + receivedMessageCount++; + LOG.info("*** receivedMessageCount {} message has MessageID {} ", receivedMessageCount, textMsg.getJMSMessageID()); + + // on first delivery ensure the message is pending an + // ack when it is resent from the producer broker + if (textMsg.getJMSMessageID().endsWith("1") && receivedMessageCount == 1) { + LOG.info("Waiting for restart..."); + consumerRestartedAndMessageForwarded.await(90, TimeUnit.SECONDS); + } + + textMsg.acknowledge(); + } + } finally { + consumerConnection.close(); + } + } + }; + + Runnable consumerBrokerResetTask = new Runnable() { + @Override + public void run() { + + try { + // wait for signal + stopConsumerBroker.await(); + + LOG.info("********* STOPPING CONSUMER BROKER"); + + consumerBroker.stop(); + consumerBroker.waitUntilStopped(); + + LOG.info("***** STARTING CONSUMER BROKER"); + // do not delete messages on startup + consumerBroker = createConsumerBroker(false); + + LOG.info("***** CONSUMER BROKER STARTED!!"); + consumerBrokerRestarted.countDown(); + + assertTrue("message forwarded on time", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("ProducerBroker totalMessageCount: " + producerBroker.getAdminView().getTotalMessageCount()); + return producerBroker.getAdminView().getTotalMessageCount() == 0; + } + })); + consumerRestartedAndMessageForwarded.countDown(); + + } catch (Exception e) { + LOG.error("Exception when stopping/starting the consumerBroker ", e); + } + + } + }; + + ExecutorService executor = Executors.newFixedThreadPool(2); + + // start consumerBroker start/stop task + executor.execute(consumerBrokerResetTask); + + // start consuming messages + Future<Integer> numberOfConsumedMessage = executor.submit(consumeMessageTask); + + produceMessages(); + + // Wait for consumer to finish + int totalMessagesConsumed = numberOfConsumedMessage.get(); + + StringBuffer contents = new StringBuffer(); + boolean messageInStore = isMessageInJDBCStore(localDataSource, contents); + LOG.debug("****number of messages received " + totalMessagesConsumed); + + assertEquals("number of messages received", 2, totalMessagesConsumed); + assertEquals("messages left in store", true, messageInStore); + assertTrue("message is in dlq: " + contents.toString(), contents.toString().contains("DLQ")); + } + + private void produceMessages() throws JMSException { + + ActiveMQConnectionFactory producerFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:2003)?randomize=false&backup=false"); + Connection producerConnection = producerFactory.createConnection(); + + try { + producerConnection.setClientID("producer"); + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final MessageProducer remoteProducer = producerSession.createProducer(QUEUE_NAME); + + int i = 0; + while (MESSAGE_COUNT > i) { + String payload = "test msg " + i; + TextMessage msg = producerSession.createTextMessage(payload); + remoteProducer.send(msg); + i++; + } + + } finally { + producerConnection.close(); + } + } + + @Before + public void setUp() throws Exception { + LOG.debug("Running with enableCursorAudit set to {}", this.enableCursorAudit); + doSetUp(); + } + + @After + public void tearDown() throws Exception { + doTearDown(); + } + + protected void doTearDown() throws Exception { + + try { + producerBroker.stop(); + } catch (Exception ex) { + } + try { + consumerBroker.stop(); + } catch (Exception ex) { + } + } + + protected void doSetUp() throws Exception { + producerBroker = createProducerBroker(); + consumerBroker = createConsumerBroker(true); + } + + /** + * Producer broker listens on localhost:2003 networks to consumerBroker - + * localhost:2006 + * + * @return + * @throws Exception + */ + protected BrokerService createProducerBroker() throws Exception { + + String networkToPorts[] = new String[] { "2006" }; + HashMap<String, String> networkProps = new HashMap<String, String>(); + + networkProps.put("networkTTL", "10"); + networkProps.put("conduitSubscriptions", "true"); + networkProps.put("decreaseNetworkConsumerPriority", "true"); + networkProps.put("dynamicOnly", "true"); + + BrokerService broker = new BrokerService(); + broker.getManagementContext().setCreateConnector(false); + broker.setDeleteAllMessagesOnStartup(true); + broker.setBrokerName("BP"); + broker.setAdvisorySupport(false); + + // lazy init listener on broker start + TransportConnector transportConnector = new TransportConnector(); + transportConnector.setUri(new URI("tcp://localhost:2003")); + List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>(); + transportConnectors.add(transportConnector); + broker.setTransportConnectors(transportConnectors); + + // network to consumerBroker + + if (networkToPorts != null && networkToPorts.length > 0) { + StringBuilder builder = new StringBuilder("static:(failover:(tcp://localhost:2006)?maxReconnectAttempts=0)?useExponentialBackOff=false"); + NetworkConnector nc = broker.addNetworkConnector(builder.toString()); + if (networkProps != null) { + IntrospectionSupport.setProperties(nc, networkProps); + } + nc.setStaticallyIncludedDestinations(Arrays.<ActiveMQDestination> asList(new ActiveMQQueue[] { QUEUE_NAME })); + } + + // Persistence adapter + + JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); + EmbeddedDataSource remoteDataSource = new EmbeddedDataSource(); + remoteDataSource.setDatabaseName("target/derbyDBRemoteBroker"); + remoteDataSource.setCreateDatabase("create"); + jdbc.setDataSource(remoteDataSource); + broker.setPersistenceAdapter(jdbc); + + // set Policy entries + PolicyEntry policy = new PolicyEntry(); + + policy.setQueue(">"); + policy.setEnableAudit(false); + policy.setUseCache(false); + policy.setExpireMessagesPeriod(0); + + // set replay with no consumers + ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory(); + conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true); + policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + + broker.start(); + broker.waitUntilStarted(); + + return broker; + } + + /** + * consumerBroker - listens on localhost:2006 + * + * @param deleteMessages + * - drop messages when broker instance is created + * @return + * @throws Exception + */ + protected BrokerService createConsumerBroker(boolean deleteMessages) throws Exception { + + String scheme = "tcp"; + String listenPort = "2006"; + + BrokerService broker = new BrokerService(); + broker.getManagementContext().setCreateConnector(false); + broker.setDeleteAllMessagesOnStartup(deleteMessages); + broker.setBrokerName("BC"); + // lazy init listener on broker start + TransportConnector transportConnector = new TransportConnector(); + transportConnector.setUri(new URI(scheme + "://localhost:" + listenPort)); + List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>(); + transportConnectors.add(transportConnector); + broker.setTransportConnectors(transportConnectors); + + // policy entries + + PolicyEntry policy = new PolicyEntry(); + + policy.setQueue(">"); + policy.setEnableAudit(enableCursorAudit); + policy.setExpireMessagesPeriod(0); + + // set replay with no consumers + ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory(); + conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true); + policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory); + + PolicyMap pMap = new PolicyMap(); + + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + + // Persistence adapter + JDBCPersistenceAdapter localJDBCPersistentAdapter = new JDBCPersistenceAdapter(); + EmbeddedDataSource localDataSource = new EmbeddedDataSource(); + localDataSource.setDatabaseName("target/derbyDBLocalBroker"); + localDataSource.setCreateDatabase("create"); + localJDBCPersistentAdapter.setDataSource(localDataSource); + broker.setPersistenceAdapter(localJDBCPersistentAdapter); + + if (deleteMessages) { + // no plugin on restart + broker.setPlugins(new BrokerPlugin[] { new MyTestPlugin() }); + } + + this.localDataSource = localDataSource; + + broker.start(); + broker.waitUntilStarted(); + + return broker; + } + + /** + * Query JDBC Store to see if messages are left + * + * @param dataSource + * @return + * @throws SQLException + */ + private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer) throws SQLException { + + boolean tableHasData = false; + String query = "select * from ACTIVEMQ_MSGS"; + + java.sql.Connection conn = dataSource.getConnection(); + PreparedStatement s = conn.prepareStatement(query); + + ResultSet set = null; + + try { + StringBuffer headers = new StringBuffer(); + set = s.executeQuery(); + ResultSetMetaData metaData = set.getMetaData(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + + if (i == 1) { + headers.append("||"); + } + headers.append(metaData.getColumnName(i) + "||"); + } + LOG.error(headers.toString()); + + while (set.next()) { + tableHasData = true; + + for (int i = 1; i <= metaData.getColumnCount(); i++) { + if (i == 1) { + stringBuffer.append("|"); + } + stringBuffer.append(set.getString(i) + "|"); + } + LOG.error(stringBuffer.toString()); + } + } finally { + try { + set.close(); + } catch (Throwable ignore) { + } + try { + s.close(); + } catch (Throwable ignore) { + } + + conn.close(); + } + + return tableHasData; + } + + /** + * plugin used to ensure consumerbroker is restared before the network + * message from producerBroker is acked + */ + class MyTestPlugin implements BrokerPlugin { + + @Override + public Broker installPlugin(Broker broker) throws Exception { + return new MyTestBroker(broker); + } + } + + class MyTestBroker extends BrokerFilter { + + public MyTestBroker(Broker next) { + super(next); + } + + @Override + public void send(ProducerBrokerExchange producerExchange, org.apache.activemq.command.Message messageSend) throws Exception { + + super.send(producerExchange, messageSend); + LOG.error("Stopping broker on send: " + messageSend.getMessageId().getProducerSequenceId()); + stopConsumerBroker.countDown(); + producerExchange.getConnectionContext().setDontSendReponse(true); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java new file mode 100644 index 0000000..13ddd30 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ5035Test { + + private static final String CLIENT_ID = "amq-test-client-id"; + private static final String DURABLE_SUB_NAME = "testDurable"; + + private final String xbean = "xbean:"; + private final String confBase = "src/test/resources/org/apache/activemq/bugs/amq5035"; + + private static BrokerService brokerService; + private String connectionUri; + + @Before + public void setUp() throws Exception { + brokerService = BrokerFactory.createBroker(xbean + confBase + "/activemq.xml"); + connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + @Test + public void testFoo() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + Connection connection = factory.createConnection(); + connection.setClientID(CLIENT_ID); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("Test.Topic"); + MessageConsumer consumer = session.createDurableSubscriber(topic, DURABLE_SUB_NAME); + consumer.close(); + + BrokerViewMBean brokerView = getBrokerView(DURABLE_SUB_NAME); + brokerView.destroyDurableSubscriber(CLIENT_ID, DURABLE_SUB_NAME); + } + + private BrokerViewMBean getBrokerView(String testDurable) throws MalformedObjectNameException { + ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"); + BrokerViewMBean view = (BrokerViewMBean) brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true); + assertNotNull(view); + return view; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java new file mode 100644 index 0000000..c2cb11e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ5136Test { + + BrokerService brokerService; + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.start(); + } + + @After + public void stopBroker() throws Exception { + brokerService.stop(); + } + + @Test + public void memoryUsageOnCommit() throws Exception { + sendMessagesAndAssertMemoryUsage(new TransactionHandler() { + @Override + public void finishTransaction(Session session) throws JMSException { + session.commit(); + } + }); + } + + @Test + public void memoryUsageOnRollback() throws Exception { + sendMessagesAndAssertMemoryUsage(new TransactionHandler() { + @Override + public void finishTransaction(Session session) throws JMSException { + session.rollback(); + } + }); + } + + private void sendMessagesAndAssertMemoryUsage(TransactionHandler transactionHandler) throws Exception { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic destination = session.createTopic("ActiveMQBug"); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < 100; i++) { + BytesMessage message = session.createBytesMessage(); + message.writeBytes(generateBytes()); + producer.send(message); + transactionHandler.finishTransaction(session); + } + connection.close(); + org.junit.Assert.assertEquals(0, BrokerRegistry.getInstance().findFirst().getSystemUsage().getMemoryUsage().getPercentUsage()); + } + + private byte[] generateBytes() { + byte[] bytes = new byte[100000]; + for (int i = 0; i < 100000; i++) { + bytes[i] = (byte) i; + } + return bytes; + } + + private static interface TransactionHandler { + void finishTransaction(Session session) throws JMSException; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java new file mode 100644 index 0000000..4c07655 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageProducer; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +@RunWith(value = Parameterized.class) +public class AMQ5212Test { + + BrokerService brokerService; + + @Parameterized.Parameter(0) + public boolean concurrentStoreAndDispatchQ = true; + + @Parameterized.Parameters(name = "concurrentStoreAndDispatch={0}") + public static Iterable<Object[]> getTestParameters() { + return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}); + } + + @Before + public void setUp() throws Exception { + start(true); + } + + public void start(boolean deleteAllMessages) throws Exception { + brokerService = new BrokerService(); + if (deleteAllMessages) { + brokerService.deleteAllMessages(); + } + ((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatchQ); + brokerService.addConnector("tcp://localhost:0"); + brokerService.setAdvisorySupport(false); + brokerService.start(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + } + + @Test + public void verifyDuplicateSuppressionWithConsumer() throws Exception { + doVerifyDuplicateSuppression(100, 100, true); + } + + @Test + public void verifyDuplicateSuppression() throws Exception { + doVerifyDuplicateSuppression(100, 100, false); + } + + public void doVerifyDuplicateSuppression(final int numToSend, final int expectedTotalEnqueue, final boolean demand) throws Exception { + final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setWatchTopicAdvisories(false); + + final int concurrency = 40; + final AtomicInteger workCount = new AtomicInteger(numToSend); + ExecutorService executorService = Executors.newFixedThreadPool(concurrency); + for (int i = 0; i < concurrency; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + try { + int i; + while ((i = workCount.getAndDecrement()) > 0) { + ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); + activeMQConnection.start(); + ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + ActiveMQQueue dest = new ActiveMQQueue("queue-" + i + "-" + + AMQ5212Test.class.getSimpleName()); + ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest); + if (demand) { + // create demand so page in will happen + activeMQSession.createConsumer(dest); + } + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setDestination(dest); + activeMQMessageProducer.send(message, null); + + // send a duplicate + activeMQConnection.syncSendPacket(message); + activeMQConnection.close(); + + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + TimeUnit.SECONDS.sleep(1); + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.MINUTES); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return expectedTotalEnqueue == brokerService.getAdminView().getTotalEnqueueCount(); + } + }); + assertEquals("total enqueue as expected", expectedTotalEnqueue, brokerService.getAdminView().getTotalEnqueueCount()); + } + + @Test + public void verifyConsumptionOnDuplicate() throws Exception { + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setWatchTopicAdvisories(false); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); + activeMQConnection.start(); + ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQQueue dest = new ActiveMQQueue("Q"); + ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setDestination(dest); + activeMQMessageProducer.send(message, null); + + // send a duplicate + activeMQConnection.syncSendPacket(message); + + activeMQConnection.close(); + + // verify original can be consumed after restart + brokerService.stop(); + brokerService.start(false); + + connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setWatchTopicAdvisories(false); + + activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); + activeMQConnection.start(); + activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer messageConsumer = activeMQSession.createConsumer(dest); + Message received = messageConsumer.receive(4000); + assertNotNull("Got message", received); + assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID()); + + activeMQConnection.close(); + } + + @Test + public void verifyClientAckConsumptionOnDuplicate() throws Exception { + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setWatchTopicAdvisories(false); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); + activeMQConnection.start(); + ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + ActiveMQQueue dest = new ActiveMQQueue("Q"); + + MessageConsumer messageConsumer = activeMQSession.createConsumer(dest); + + ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setDestination(dest); + activeMQMessageProducer.send(message, null); + + // send a duplicate + activeMQConnection.syncSendPacket(message); + + + Message received = messageConsumer.receive(4000); + assertNotNull("Got message", received); + assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID()); + messageConsumer.close(); + + + messageConsumer = activeMQSession.createConsumer(dest); + received = messageConsumer.receive(4000); + assertNotNull("Got message", received); + assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID()); + received.acknowledge(); + + activeMQConnection.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java new file mode 100644 index 0000000..0d7f44b --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java @@ -0,0 +1,602 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; + +/** + Non transactional concurrent producer/consumer to single dest + */ +@RunWith(Parameterized.class) +public class AMQ5266SingleDestTest { + static Logger LOG = LoggerFactory.getLogger(AMQ5266SingleDestTest.class); + String activemqURL; + BrokerService brokerService; + + public int numDests = 1; + public int messageSize = 10*1000; + + @Parameterized.Parameter(0) + public int publisherMessagesPerThread = 1000; + + @Parameterized.Parameter(1) + public int publisherThreadCount = 20; + + @Parameterized.Parameter(2) + public int consumerThreadsPerQueue = 5; + + @Parameterized.Parameter(3) + public int destMemoryLimit = 50 * 1024; + + @Parameterized.Parameter(4) + public boolean useCache = true; + + @Parameterized.Parameter(5) + public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB; + + @Parameterized.Parameter(6) + public boolean optimizeDispatch = false; + + @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}") + public static Iterable<Object[]> parameters() { + return Arrays.asList(new Object[][]{ + {1000, 40, 40, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, + {1000, 40, 40, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, + {1000, 40, 40, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, + }); + } + + public int consumerBatchSize = 25; + + @BeforeClass + public static void derbyTestMode() throws Exception { + System.setProperty("derby.system.durability","test"); + } + + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + + TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setUseJmx(false); + brokerService.setAdvisorySupport(false); + + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract! + defaultEntry.setMaxProducersToAudit(publisherThreadCount); + defaultEntry.setEnableAudit(true); + defaultEntry.setUseCache(useCache); + defaultEntry.setMaxPageSize(1000); + defaultEntry.setOptimizedDispatch(optimizeDispatch); + defaultEntry.setMemoryLimit(destMemoryLimit); + defaultEntry.setExpireMessagesPeriod(0); + policyMap.setDefaultEntry(defaultEntry); + brokerService.setDestinationPolicy(policyMap); + + brokerService.getSystemUsage().getMemoryUsage().setLimit(64 * 1024 * 1024); + + TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0"); + brokerService.start(); + activemqURL = transportConnector.getPublishableConnectString(); + activemqURL += "?jms.watchTopicAdvisories=false"; // ensure all messages are queue or dlq messages + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void test() throws Exception { + + String activemqQueues = "activemq"; + for (int i=1;i<numDests;i++) { + activemqQueues +=",activemq"+i; + } + + int consumerWaitForConsumption = 5 * 60 * 1000; + + ExportQueuePublisher publisher = null; + ExportQueueConsumer consumer = null; + + LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified."); + LOG.info("\nBuilding Publisher..."); + + publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount); + + LOG.info("Building Consumer..."); + + consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount); + + long totalStart = System.currentTimeMillis(); + + LOG.info("Starting Publisher..."); + + publisher.start(); + + LOG.info("Starting Consumer..."); + + consumer.start(); + + int distinctPublishedCount = 0; + + + LOG.info("Waiting For Publisher Completion..."); + + publisher.waitForCompletion(); + + List publishedIds = publisher.getIDs(); + distinctPublishedCount = new TreeSet(publishedIds).size(); + + LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount); + LOG.info("Publisher duration: {}", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - totalStart)); + + + long endWait = System.currentTimeMillis() + consumerWaitForConsumption; + while (!consumer.completed() && System.currentTimeMillis() < endWait) { + try { + int secs = (int) (endWait - System.currentTimeMillis()) / 1000; + LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs"); + Thread.sleep(1000); + } catch (Exception e) { + } + } + + LOG.info("\nConsumer Complete: " + consumer.completed() +", Shutting Down."); + + LOG.info("Total duration: {}", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - totalStart)); + + consumer.shutdown(); + + TimeUnit.SECONDS.sleep(2); + + LOG.info("Consumer Stats:"); + + for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) { + + List<String> idList = entry.getValue(); + + int distinctConsumed = new TreeSet<String>(idList).size(); + + StringBuilder sb = new StringBuilder(); + sb.append(" Queue: " + entry.getKey() + + " -> Total Messages Consumed: " + idList.size() + + ", Distinct IDs Consumed: " + distinctConsumed); + + int diff = distinctPublishedCount - distinctConsumed; + sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) "); + LOG.info(sb.toString()); + + assertEquals("expect to get all messages!", 0, diff); + + } + + // verify empty dlq + assertEquals("No pending messages", 0l, ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount()); + } + + public class ExportQueuePublisher { + + private final String amqUser = ActiveMQConnection.DEFAULT_USER; + private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; + private ActiveMQConnectionFactory connectionFactory = null; + private String activemqURL = null; + private String activemqQueues = null; + // Collection of distinct IDs that the publisher has published. + // After a message is published, its UUID will be written to this list for tracking. + // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs. + //private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>()); + private List<String> ids = Collections.synchronizedList(new ArrayList<String>()); + private List<PublisherThread> threads; + + public ExportQueuePublisher(String activemqURL, String activemqQueues, int messagesPerThread, int threadCount) throws Exception { + + this.activemqURL = activemqURL; + this.activemqQueues = activemqQueues; + + threads = new ArrayList<PublisherThread>(); + + // Build the threads and tell them how many messages to publish + for (int i = 0; i < threadCount; i++) { + PublisherThread pt = new PublisherThread(messagesPerThread); + threads.add(pt); + } + } + + public List<String> getIDs() { + return ids; + } + + // Kick off threads + public void start() throws Exception { + + for (PublisherThread pt : threads) { + pt.start(); + } + } + + // Wait for threads to complete. They will complete once they've published all of their messages. + public void waitForCompletion() throws Exception { + + for (PublisherThread pt : threads) { + pt.join(); + pt.close(); + } + } + + private Session newSession(QueueConnection queueConnection) throws Exception { + return queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + private synchronized QueueConnection newQueueConnection() throws Exception { + + if (connectionFactory == null) { + connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); + } + + // Set the redelivery count to -1 (infinite), or else messages will start dropping + // after the queue has had a certain number of failures (default is 6) + RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); + policy.setMaximumRedeliveries(-1); + + QueueConnection amqConnection = connectionFactory.createQueueConnection(); + amqConnection.start(); + return amqConnection; + } + + private class PublisherThread extends Thread { + + private int count; + private QueueConnection qc; + private Session session; + private MessageProducer mp; + + private PublisherThread(int count) throws Exception { + + this.count = count; + + // Each Thread has its own Connection and Session, so no sync worries + qc = newQueueConnection(); + session = newSession(qc); + + // In our code, when publishing to multiple queues, + // we're using composite destinations like below + Queue q = new ActiveMQQueue(activemqQueues); + mp = session.createProducer(q); + } + + public void run() { + + try { + + // Loop until we've published enough messages + while (count-- > 0) { + + TextMessage tm = session.createTextMessage(getMessageText()); + String id = UUID.randomUUID().toString(); + tm.setStringProperty("KEY", id); + ids.add(id); // keep track of the key to compare against consumer + + mp.send(tm); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + // Called by waitForCompletion + public void close() { + + try { + mp.close(); + } catch (Exception e) { + } + + try { + session.close(); + } catch (Exception e) { + } + + try { + qc.close(); + } catch (Exception e) { + } + } + } + + } + + String messageText; + private String getMessageText() { + + if (messageText == null) { + + synchronized (this) { + + if (messageText == null) { + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < messageSize; i++) { + sb.append("X"); + } + messageText = sb.toString(); + } + } + } + + return messageText; + } + + + public class ExportQueueConsumer { + + private final String amqUser = ActiveMQConnection.DEFAULT_USER; + private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; + private final int totalToExpect; + private ActiveMQConnectionFactory connectionFactory = null; + private String activemqURL = null; + private String activemqQueues = null; + private String[] queues = null; + // Map of IDs that were consumed, keyed by queue name. + // We'll compare these against what was published to know if any got stuck or dropped. + private Map<String, List<String>> idsByQueue = new HashMap<String, List<String>>(); + private Map<String, List<ConsumerThread>> threads; + + public ExportQueueConsumer(String activemqURL, String activemqQueues, int threadsPerQueue, int batchSize, int totalToExpect) throws Exception { + + this.activemqURL = activemqURL; + this.activemqQueues = activemqQueues; + this.totalToExpect = totalToExpect; + + queues = this.activemqQueues.split(","); + + for (int i = 0; i < queues.length; i++) { + queues[i] = queues[i].trim(); + } + + threads = new HashMap<String, List<ConsumerThread>>(); + + // For each queue, create a list of threads and set up the list of ids + for (String q : queues) { + + List<ConsumerThread> list = new ArrayList<ConsumerThread>(); + + idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>())); + + for (int i = 0; i < threadsPerQueue; i++) { + list.add(new ConsumerThread(q, batchSize)); + } + + threads.put(q, list); + } + } + + public Map<String, List<String>> getIDs() { + return idsByQueue; + } + + // Start the threads + public void start() throws Exception { + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.start(); + } + } + } + + // Tell the threads to stop + // Then wait for them to stop + public void shutdown() throws Exception { + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.shutdown(); + } + } + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.join(); + } + } + } + + private Session newSession(QueueConnection queueConnection) throws Exception { + return queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + private synchronized QueueConnection newQueueConnection() throws Exception { + + if (connectionFactory == null) { + connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); + } + + // Set the redelivery count to -1 (infinite), or else messages will start dropping + // after the queue has had a certain number of failures (default is 6) + RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); + policy.setMaximumRedeliveries(-1); + + QueueConnection amqConnection = connectionFactory.createQueueConnection(); + amqConnection.start(); + return amqConnection; + } + + public boolean completed() { + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + if (ct.isAlive()) { + LOG.info("thread for {} is still alive.", ct.qName); + return false; + } + } + } + return true; + } + + private class ConsumerThread extends Thread { + + private int batchSize; + private QueueConnection qc; + private Session session; + private MessageConsumer mc; + private List<String> idList; + private boolean shutdown = false; + private String qName; + + private ConsumerThread(String queueName, int batchSize) throws Exception { + + this.batchSize = batchSize; + + // Each thread has its own connection and session + qName = queueName; + qc = newQueueConnection(); + session = newSession(qc); + Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize); + mc = session.createConsumer(q); + + idList = idsByQueue.get(queueName); + } + + public void run() { + + try { + + int count = 0; + + // Keep reading as long as it hasn't been told to shutdown + while (!shutdown) { + + if (idList.size() >= totalToExpect) { + LOG.info("Got {} for q: {}", +idList.size(), qName); + break; + } + Message m = mc.receive(4000); + + if (m != null) { + + // We received a non-null message, add the ID to our list + + idList.add(m.getStringProperty("KEY")); + + count++; + + // If we've reached our batch size, commit the batch and reset the count + + if (count == batchSize) { + count = 0; + } + } else { + + // We didn't receive anything this time, commit any current batch and reset the count + + count = 0; + + // Sleep a little before trying to read after not getting a message + + try { + if (idList.size() < totalToExpect) { + LOG.info("did not receive on {}, current count: {}", qName, idList.size()); + } + //sleep(3000); + } catch (Exception e) { + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + + // Once we exit, close everything + close(); + } + } + + public void shutdown() { + shutdown = true; + } + + public void close() { + + try { + mc.close(); + } catch (Exception e) { + } + + try { + session.close(); + } catch (Exception e) { + } + + try { + qc.close(); + } catch (Exception e) { + + } + } + } + } +}
