http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java new file mode 100644 index 0000000..e71cfe6 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import java.util.concurrent.TimeUnit; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.LeaseLockerIOExceptionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest extends DbRestartJDBCQueueMasterSlaveLeaseTest { + private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.class); + + private long restartDelay = 2000; + + @Override + protected void configureBroker(BrokerService brokerService) { + // master and slave survive db restart and retain master/slave status + LeaseLockerIOExceptionHandler stopConnectors = new LeaseLockerIOExceptionHandler(); + brokerService.setIoExceptionHandler(stopConnectors); + } + + @Override + protected void delayTillRestartRequired() { + if (restartDelay > 2000) { + LOG.info("delay for more than lease quantum. While Db is offline, master should stay alive but could loose lease"); + } else { + LOG.info("delay for less than lease quantum. While Db is offline, master should stay alive"); + } + try { + TimeUnit.MILLISECONDS.sleep(restartDelay); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + protected void verifyExpectedBroker(int inflightMessageCount) { + if (inflightMessageCount == 0 || (inflightMessageCount == failureCount + 10 && restartDelay <= 500)) { + assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName()); + } else { + // lease expired while DB was offline, either or master/slave can grab it so assert is not deterministic + // but we still need to validate sent == received + } + } + + @Override + public void setUp() throws Exception { + restartDelay = 2000; + super.setUp(); + } + + public void testSendReceiveWithLeaseExpiry() throws Exception { + restartDelay = 10000; + testSendReceive(); + } + + // ignore this test case + public void testAdvisory() throws Exception {} + + @Override + public void testSendReceive() throws Exception { + // Ignore this test for now, see AMQ-4975 + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java new file mode 100644 index 0000000..cf4929a --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import java.io.IOException; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.jdbc.LeaseDatabaseLocker; +import org.apache.activemq.util.LeaseLockerIOExceptionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DbRestartJDBCQueueMasterSlaveLeaseTest extends DbRestartJDBCQueueMasterSlaveTest { + private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseTest.class); + + @Override + protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException { + super.configureJdbcPersistenceAdapter(persistenceAdapter); + persistenceAdapter.setLocker(new LeaseDatabaseLocker()); + persistenceAdapter.getLocker().setLockAcquireSleepInterval(getLockAcquireSleepInterval()); + persistenceAdapter.setLockKeepAlivePeriod(getLockKeepAlivePeriod()); + } + + @Override + protected void configureBroker(BrokerService brokerService) { + //let the brokers die on exception and master should have lease on restart + // which will delay slave start till it expires + LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler(); + ioExceptionHandler.setIgnoreSQLExceptions(false); + ioExceptionHandler.setStopStartConnectors(false); + ioExceptionHandler.setResumeCheckSleepPeriod(500l); + brokerService.setIoExceptionHandler(ioExceptionHandler); + } + + private long getLockKeepAlivePeriod() { + return 1000; + } + + private long getLockAcquireSleepInterval() { + return 8000; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java new file mode 100644 index 0000000..fb04803 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TransactionRolledBackException; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest { + private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveTest.class); + + protected void messageSent() throws Exception { + verifyExpectedBroker(inflightMessageCount); + if (++inflightMessageCount == failureCount) { + LOG.info("STOPPING DB!@!!!!"); + final EmbeddedDataSource ds = ((SyncCreateDataSource)getExistingDataSource()).getDelegate(); + ds.setShutdownDatabase("shutdown"); + LOG.info("DB STOPPED!@!!!!"); + + Thread dbRestartThread = new Thread("db-re-start-thread") { + public void run() { + delayTillRestartRequired(); + ds.setShutdownDatabase("false"); + LOG.info("DB RESTARTED!@!!!!"); + } + }; + dbRestartThread.start(); + } + verifyExpectedBroker(inflightMessageCount); + } + + protected void verifyExpectedBroker(int inflightMessageCount) { + if (inflightMessageCount == 0) { + assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName()); + } else if (inflightMessageCount == failureCount + 10) { + assertEquals("connected to slave, count:" + inflightMessageCount, slave.get().getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName()); + } + } + + protected void delayTillRestartRequired() { + LOG.info("Waiting for master broker to Stop"); + master.waitUntilStopped(); + } + + protected void sendToProducer(MessageProducer producer, + Destination producerDestination, Message message) throws JMSException { + producer.send(producerDestination, message); + } + + @Override + protected Session createReceiveSession(Connection receiveConnection) throws Exception { + return receiveConnection.createSession(true, Session.SESSION_TRANSACTED); + } + + @Override + protected void consumeMessage(Message message, List<Message> messageList) { + try { + receiveSession.commit(); + super.consumeMessage(message, messageList); + } catch (JMSException e) { + LOG.info("Failed to commit message receipt: " + message, e); + try { + receiveSession.rollback(); + } catch (JMSException ignored) { + } + + if (e instanceof TransactionRolledBackException) { + TransactionRolledBackException transactionRolledBackException = (TransactionRolledBackException) e; + if (transactionRolledBackException.getMessage().indexOf("in doubt") != -1) { + // failover chucked bc there is a missing reply to a commit. + // failover is involved b/c the store exception is handled broker side and the client just + // sees a disconnect (socket.close()). + // If the client needs to be aware of the failure then it should not use IOExceptionHandler + // so that the exception will propagate back + + // for this test case: + // the commit may have got there and the reply is lost "or" the commit may be lost. + // so we may or may not get a resend. + // + // At the application level we need to determine if the message is there or not which is not trivial + // for this test we assert received == sent + // so we need to know whether the message will be replayed. + // we can ask the store b/c we know it is jdbc - guess we could go through a destination + // message store interface also or use jmx + java.sql.Connection dbConnection = null; + try { + ActiveMQMessage mqMessage = (ActiveMQMessage) message; + MessageId id = mqMessage.getMessageId(); + dbConnection = sharedDs.getConnection(); + PreparedStatement s = dbConnection.prepareStatement(((JDBCPersistenceAdapter) connectedToBroker().getPersistenceAdapter()).getStatements().getFindMessageStatement()); + s.setString(1, id.getProducerId().toString()); + s.setLong(2, id.getProducerSequenceId()); + ResultSet rs = s.executeQuery(); + + if (!rs.next()) { + // message is gone, so lets count it as consumed + LOG.info("On TransactionRolledBackException we know that the ack/commit got there b/c message is gone so we count it: " + mqMessage); + super.consumeMessage(message, messageList); + } else { + LOG.info("On TransactionRolledBackException we know that the ack/commit was lost so we expect a replay of: " + mqMessage); + } + } catch (Exception dbe) { + dbe.printStackTrace(); + } finally { + try { + dbConnection.close(); + } catch (SQLException e1) { + e1.printStackTrace(); + } + } + } + } + } + } + + private BrokerService connectedToBroker() { + return ((ActiveMQConnection)receiveConnection).getBrokerInfo().getBrokerName().equals("master") ? master : slave.get(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java new file mode 100644 index 0000000..9b21c44 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.jdbc.DataSourceServiceSupport; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.util.DefaultIOExceptionHandler; +import org.apache.activemq.util.IOHelper; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DbRestartJDBCQueueTest extends JmsTopicSendReceiveWithTwoConnectionsTest implements ExceptionListener { + private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueTest.class); + + public boolean transactedSends = false; + public int failureCount = 25; // or 20 for even tx batch boundary + + int inflightMessageCount = 0; + EmbeddedDataSource sharedDs; + BrokerService broker; + final CountDownLatch restartDBLatch = new CountDownLatch(1); + + protected void setUp() throws Exception { + setAutoFail(true); + topic = false; + verbose = true; + // startup db + sharedDs = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()); + + broker = new BrokerService(); + + DefaultIOExceptionHandler handler = new DefaultIOExceptionHandler(); + handler.setIgnoreSQLExceptions(false); + handler.setStopStartConnectors(true); + broker.setIoExceptionHandler(handler); + broker.addConnector("tcp://localhost:0"); + broker.setUseJmx(false); + broker.setPersistent(true); + broker.setDeleteAllMessagesOnStartup(true); + JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter(); + persistenceAdapter.setDataSource(sharedDs); + persistenceAdapter.setUseLock(false); + persistenceAdapter.setLockKeepAlivePeriod(500); + persistenceAdapter.getLocker().setLockAcquireSleepInterval(500); + broker.setPersistenceAdapter(persistenceAdapter); + broker.start(); + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + broker.stop(); + } + + + protected Session createSendSession(Connection sendConnection) throws Exception { + if (transactedSends) { + return sendConnection.createSession(true, Session.SESSION_TRANSACTED); + } else { + return sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory f = + new ActiveMQConnectionFactory("failover://" + broker.getTransportConnectors().get(0).getPublishableConnectString()); + f.setExceptionListener(this); + return f; + } + + @Override + protected void messageSent() throws Exception { + if (++inflightMessageCount == failureCount) { + LOG.info("STOPPING DB!@!!!!"); + final EmbeddedDataSource ds = sharedDs; + ds.setShutdownDatabase("shutdown"); + try { + ds.getConnection(); + } catch (Exception ignored) { + } + LOG.info("DB STOPPED!@!!!!"); + + Thread dbRestartThread = new Thread("db-re-start-thread") { + public void run() { + LOG.info("Sleeping for 10 seconds before allowing db restart"); + try { + restartDBLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + ds.setShutdownDatabase("false"); + LOG.info("DB RESTARTED!@!!!!"); + } + }; + dbRestartThread.start(); + } + } + + protected void sendToProducer(MessageProducer producer, + Destination producerDestination, Message message) throws JMSException { + { + // do some retries as db failures filter back to the client until broker sees + // db lock failure and shuts down + boolean sent = false; + do { + try { + producer.send(producerDestination, message); + + if (transactedSends && ((inflightMessageCount+1) %10 == 0 || (inflightMessageCount+1) >= messageCount)) { + LOG.info("committing on send: " + inflightMessageCount + " message: " + message); + session.commit(); + } + + sent = true; + } catch (JMSException e) { + LOG.info("Exception on producer send:", e); + try { + Thread.sleep(2000); + } catch (InterruptedException ignored) { + } + } + } while(!sent); + + } + } + + @Override + public void onException(JMSException exception) { + LOG.error("exception on connection: ", exception); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java new file mode 100644 index 0000000..c7b0ec6 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.logging.Logger; + +import javax.sql.DataSource; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.store.jdbc.DataSourceServiceSupport; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.util.DefaultIOExceptionHandler; +import org.apache.activemq.util.IOHelper; +import org.apache.derby.jdbc.EmbeddedDataSource; + +public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTestSupport { + protected DataSource sharedDs; + protected String MASTER_URL = "tcp://localhost:62001"; + protected String SLAVE_URL = "tcp://localhost:62002"; + + protected void setUp() throws Exception { + // startup db + sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory())); + super.setUp(); + } + + protected void createMaster() throws Exception { + master = new BrokerService(); + master.setBrokerName("master"); + master.addConnector(MASTER_URL); + master.setUseJmx(false); + master.setPersistent(true); + master.setDeleteAllMessagesOnStartup(true); + JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter(); + persistenceAdapter.setDataSource(getExistingDataSource()); + configureJdbcPersistenceAdapter(persistenceAdapter); + master.setPersistenceAdapter(persistenceAdapter); + configureBroker(master); + master.start(); + } + + protected void configureBroker(BrokerService brokerService) { + DefaultIOExceptionHandler stopBrokerOnStoreException = new DefaultIOExceptionHandler(); + // we want any store io exception to stop the broker + stopBrokerOnStoreException.setIgnoreSQLExceptions(false); + brokerService.setIoExceptionHandler(stopBrokerOnStoreException); + } + + protected void createSlave() throws Exception { + // use a separate thread as the slave will block waiting for + // the exclusive db lock + Thread t = new Thread() { + public void run() { + try { + BrokerService broker = new BrokerService(); + broker.setBrokerName("slave"); + TransportConnector connector = new TransportConnector(); + connector.setUri(new URI(SLAVE_URL)); + broker.addConnector(connector); + // no need for broker.setMasterConnectorURI(masterConnectorURI) + // as the db lock provides the slave/master initialisation + broker.setUseJmx(false); + broker.setPersistent(true); + JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter(); + persistenceAdapter.setDataSource(getExistingDataSource()); + persistenceAdapter.setCreateTablesOnStartup(false); + broker.setPersistenceAdapter(persistenceAdapter); + configureJdbcPersistenceAdapter(persistenceAdapter); + configureBroker(broker); + broker.start(); + slave.set(broker); + slaveStarted.countDown(); + } catch (IllegalStateException expectedOnShutdown) { + } catch (Exception e) { + fail("failed to start slave broker, reason:" + e); + } + } + }; + t.start(); + } + + protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException { + persistenceAdapter.setLockKeepAlivePeriod(500); + persistenceAdapter.getLocker().setLockAcquireSleepInterval(500); + } + + protected DataSource getExistingDataSource() throws Exception { + return sharedDs; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java new file mode 100644 index 0000000..7dc88f7 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import java.io.File; +import java.net.URI; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.leveldb.LevelDBStore; +import org.junit.Ignore; + + +public class QueueMasterSlaveSingleUrlTest extends QueueMasterSlaveTestSupport { + private final String brokerUrl = "tcp://localhost:62001"; + private final String singleUriString = "failover://(" + brokerUrl +")?randomize=false"; + + @Override + protected void setUp() throws Exception { + setAutoFail(true); + super.setUp(); + } + + @Override + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(singleUriString); + } + + @Override + protected void createMaster() throws Exception { + master = new BrokerService(); + master.setBrokerName("shared-master"); + configureSharedPersistenceAdapter(master); + master.addConnector(brokerUrl); + master.start(); + } + + private void configureSharedPersistenceAdapter(BrokerService broker) throws Exception { + LevelDBStore adapter = new LevelDBStore(); + adapter.setDirectory(new File("shared")); + broker.setPersistenceAdapter(adapter); + } + + @Override + protected void createSlave() throws Exception { + new Thread(new Runnable() { + @Override + public void run() { + try { + BrokerService broker = new BrokerService(); + broker.setBrokerName("shared-slave"); + configureSharedPersistenceAdapter(broker); + // add transport as a service so that it is bound on start, after store started + final TransportConnector tConnector = new TransportConnector(); + tConnector.setUri(new URI(brokerUrl)); + broker.addConnector(tConnector); + + broker.start(); + slave.set(broker); + slaveStarted.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + }).start(); + } + + + // The @Ignore is just here for documentation, since this is a JUnit3 test + // I added the sleep because without it the two other test cases fail. I haven't looked into it, but + // my guess whatever setUp does isn't really finished when the teardown runs. + @Ignore("See https://issues.apache.org/jira/browse/AMQ-5164") + @Override + public void testAdvisory() throws Exception { + Thread.sleep(5 * 1000); + //super.testAdvisory(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java new file mode 100644 index 0000000..2808ebe --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.xbean.BrokerFactoryBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.ClassPathResource; + +/** + * Test failover for Queues + */ +abstract public class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWithTwoConnectionsTest { + private static final transient Logger LOG = LoggerFactory.getLogger(QueueMasterSlaveTestSupport.class); + + protected BrokerService master; + protected AtomicReference<BrokerService> slave = new AtomicReference<BrokerService>(); + protected CountDownLatch slaveStarted = new CountDownLatch(1); + protected int inflightMessageCount; + protected int failureCount = 50; + protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&useExponentialBackOff=false"; + + @Override + protected void setUp() throws Exception { + setMaxTestTime(TimeUnit.MINUTES.toMillis(10)); + setAutoFail(true); + if (System.getProperty("basedir") == null) { + File file = new File("."); + System.setProperty("basedir", file.getAbsolutePath()); + } + super.messageCount = 500; + failureCount = super.messageCount / 2; + super.topic = isTopic(); + createMaster(); + createSlave(); + // wait for thing to connect + Thread.sleep(1000); + super.setUp(); + } + + protected String getSlaveXml() { + return "org/apache/activemq/broker/ft/slave.xml"; + } + + protected String getMasterXml() { + return "org/apache/activemq/broker/ft/master.xml"; + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + master.stop(); + master.waitUntilStopped(); + slaveStarted.await(60, TimeUnit.SECONDS); + BrokerService brokerService = slave.get(); + if( brokerService!=null ) { + brokerService.stop(); + } + master.stop(); + } + + @Override + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(uriString); + } + + @Override + protected void messageSent() throws Exception { + if (++inflightMessageCount == failureCount) { + Thread.sleep(1000); + LOG.error("MASTER STOPPED!@!!!!"); + master.stop(); + } + } + + protected boolean isTopic() { + return false; + } + + protected void createMaster() throws Exception { + BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml())); + brokerFactory.afterPropertiesSet(); + master = brokerFactory.getBroker(); + master.start(); + } + + protected void createSlave() throws Exception { + BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml())); + brokerFactory.afterPropertiesSet(); + BrokerService broker = brokerFactory.getBroker(); + broker.start(); + slave.set(broker); + slaveStarted.countDown(); + } + + public void testVirtualTopicFailover() throws Exception { + + MessageConsumer qConsumer = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1")); + assertNull("No message there yet", qConsumer.receive(1000)); + qConsumer.close(); + assertTrue(!master.isSlave()); + master.stop(); + assertTrue("slave started", slaveStarted.await(60, TimeUnit.SECONDS)); + assertTrue(!slave.get().isSlave()); + + final String text = "ForUWhenSlaveKicksIn"; + producer.send(new ActiveMQTopic("VirtualTopic.TA1"), session.createTextMessage(text)); + + qConsumer = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1")); + + javax.jms.Message message = qConsumer.receive(4000); + assertNotNull("Get message after failover", message); + assertEquals("correct message", text, ((TextMessage)message).getText()); + } + + public void testAdvisory() throws Exception { + MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic()); + + master.stop(); + assertTrue("slave started", slaveStarted.await(60, TimeUnit.SECONDS)); + LOG.info("slave started"); + Message advisoryMessage = advConsumer.receive(5000); + LOG.info("received " + advisoryMessage); + assertNotNull("Didn't received advisory", advisoryMessage); + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java new file mode 100644 index 0000000..c9f178d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +public class QueueMasterSlaveTestUsingSharedFileTest extends + QueueMasterSlaveTestSupport { + + protected String getSlaveXml() { + return "org/apache/activemq/broker/ft/sharedFileSlave.xml"; + } + + protected String getMasterXml() { + return "org/apache/activemq/broker/ft/sharedFileMaster.xml"; + } + + protected void createSlave() throws Exception { + // Start the Brokers async since starting them up could be a blocking operation.. + new Thread(new Runnable() { + public void run() { + try { + QueueMasterSlaveTestUsingSharedFileTest.super.createSlave(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + }).start(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java new file mode 100644 index 0000000..5331a22 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.logging.Logger; +import javax.sql.DataSource; +import org.apache.derby.jdbc.EmbeddedDataSource; + +// prevent concurrent calls from attempting to create the db at the same time +// can result in "already exists in this jvm" errors + +public class SyncCreateDataSource implements DataSource { + final EmbeddedDataSource delegate; + + SyncCreateDataSource(EmbeddedDataSource dataSource) { + this.delegate = dataSource; + } + + @Override + public Connection getConnection() throws SQLException { + synchronized (this) { + return delegate.getConnection(); + } + } + + @Override + public Connection getConnection(String username, String password) throws SQLException { + synchronized (this) { + return delegate.getConnection(); + } + } + + @Override + public PrintWriter getLogWriter() throws SQLException { + return null; + } + + @Override + public void setLogWriter(PrintWriter out) throws SQLException { + } + + @Override + public int getLoginTimeout() throws SQLException { + return 0; + } + + @Override + public void setLoginTimeout(int seconds) throws SQLException { + } + + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class<?> iface) throws SQLException { + return false; + } + + EmbeddedDataSource getDelegate() { + return delegate; + } + + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java new file mode 100644 index 0000000..ee7ca0f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import java.io.IOException; +import java.net.URI; +import javax.sql.DataSource; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.store.jdbc.DataSourceServiceSupport; +import org.apache.activemq.store.jdbc.LeaseDatabaseLocker; +import org.apache.activemq.store.jdbc.Statements; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.DefaultIOExceptionHandler; +import org.apache.activemq.util.IOHelper; +import org.apache.derby.jdbc.EmbeddedDataSource; + +public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSupport { + protected DataSource sharedDs; + protected String MASTER_URL = "tcp://localhost:62001"; + protected String SLAVE_URL = "tcp://localhost:62002"; + + protected void setUp() throws Exception { + // startup db + sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory())); + super.setUp(); + } + + protected void createMaster() throws Exception { + master = new BrokerService(); + master.setBrokerName("master"); + master.addConnector(MASTER_URL); + master.setUseJmx(false); + master.setPersistent(true); + master.setDeleteAllMessagesOnStartup(true); + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) master.getPersistenceAdapter(); + LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); + leaseDatabaseLocker.setCreateTablesOnStartup(true); + leaseDatabaseLocker.setDataSource(getExistingDataSource()); + leaseDatabaseLocker.setStatements(new Statements()); + kahaDBPersistenceAdapter.setLocker(leaseDatabaseLocker); + configureLocker(kahaDBPersistenceAdapter); + configureBroker(master); + master.start(); + } + + protected void configureBroker(BrokerService brokerService) { + DefaultIOExceptionHandler stopBrokerOnStoreException = new DefaultIOExceptionHandler(); + // we want any store io exception to stop the broker + stopBrokerOnStoreException.setIgnoreSQLExceptions(false); + brokerService.setIoExceptionHandler(stopBrokerOnStoreException); + } + + protected void createSlave() throws Exception { + // use a separate thread as the slave will block waiting for + // the exclusive db lock + Thread t = new Thread() { + public void run() { + try { + BrokerService broker = new BrokerService(); + broker.setBrokerName("slave"); + TransportConnector connector = new TransportConnector(); + connector.setUri(new URI(SLAVE_URL)); + broker.addConnector(connector); + broker.setUseJmx(false); + broker.setPersistent(true); + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); + leaseDatabaseLocker.setDataSource(getExistingDataSource()); + leaseDatabaseLocker.setStatements(new Statements()); + kahaDBPersistenceAdapter.setLocker(leaseDatabaseLocker); + configureLocker(kahaDBPersistenceAdapter); + configureBroker(broker); + broker.start(); + slave.set(broker); + slaveStarted.countDown(); + } catch (IllegalStateException expectedOnShutdown) { + } catch (Exception e) { + fail("failed to start slave broker, reason:" + e); + } + } + }; + t.start(); + } + + protected void configureLocker(KahaDBPersistenceAdapter kahaDBPersistenceAdapter) throws IOException { + kahaDBPersistenceAdapter.setLockKeepAlivePeriod(500); + kahaDBPersistenceAdapter.getLocker().setLockAcquireSleepInterval(500); + } + + @Override + public void testVirtualTopicFailover() throws Exception { + // Ignoring for now, see AMQ-4842 + } + + protected DataSource getExistingDataSource() throws Exception { + return sharedDs; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/mKahaDbQueueMasterSlaveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/mKahaDbQueueMasterSlaveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/mKahaDbQueueMasterSlaveTest.java new file mode 100644 index 0000000..ad9cca1 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/mKahaDbQueueMasterSlaveTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import java.net.URI; +import java.util.LinkedList; +import java.util.List; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; + +public class mKahaDbQueueMasterSlaveTest extends QueueMasterSlaveTestSupport { + protected String MASTER_URL = "tcp://localhost:62001"; + protected String SLAVE_URL = "tcp://localhost:62002"; + + protected void createMaster() throws Exception { + master = new BrokerService(); + master.setBrokerName("master"); + master.addConnector(MASTER_URL); + master.setUseJmx(false); + master.setPersistent(true); + master.setDeleteAllMessagesOnStartup(true); + + MultiKahaDBPersistenceAdapter mKahaDB = new MultiKahaDBPersistenceAdapter(); + List adapters = new LinkedList<FilteredKahaDBPersistenceAdapter>(); + FilteredKahaDBPersistenceAdapter defaultEntry = new FilteredKahaDBPersistenceAdapter(); + defaultEntry.setPersistenceAdapter(new KahaDBPersistenceAdapter()); + defaultEntry.setPerDestination(true); + adapters.add(defaultEntry); + + mKahaDB.setFilteredPersistenceAdapters(adapters); + master.setPersistenceAdapter(mKahaDB); + + master.start(); + } + + protected void createSlave() throws Exception { + // use a separate thread as the slave will block waiting for + // the exclusive db lock + Thread t = new Thread() { + public void run() { + try { + BrokerService broker = new BrokerService(); + broker.setBrokerName("slave"); + TransportConnector connector = new TransportConnector(); + connector.setUri(new URI(SLAVE_URL)); + broker.addConnector(connector); + // no need for broker.setMasterConnectorURI(masterConnectorURI) + // as the db lock provides the slave/master initialisation + broker.setUseJmx(false); + broker.setPersistent(true); + + MultiKahaDBPersistenceAdapter mKahaDB = new MultiKahaDBPersistenceAdapter(); + List adapters = new LinkedList<FilteredKahaDBPersistenceAdapter>(); + FilteredKahaDBPersistenceAdapter defaultEntry = new FilteredKahaDBPersistenceAdapter(); + defaultEntry.setPersistenceAdapter(new KahaDBPersistenceAdapter()); + defaultEntry.setPerDestination(true); + adapters.add(defaultEntry); + + mKahaDB.setFilteredPersistenceAdapters(adapters); + broker.setPersistenceAdapter(mKahaDB); + broker.start(); + slave.set(broker); + slaveStarted.countDown(); + } catch (IllegalStateException expectedOnShutdown) { + } catch (Exception e) { + fail("failed to start slave broker, reason:" + e); + } + } + }; + t.start(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileMaster.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileMaster.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileMaster.xml new file mode 100644 index 0000000..f5eef67 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileMaster.xml @@ -0,0 +1,34 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> + + <broker brokerName="shared" useJmx="false" deleteAllMessagesOnStartup="true" xmlns="http://activemq.apache.org/schema/core"> + <transportConnectors> + <transportConnector uri="tcp://localhost:62001"/> + </transportConnectors> + + </broker> + +</beans> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileSlave.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileSlave.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileSlave.xml new file mode 100644 index 0000000..7c88a8c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileSlave.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> + + <broker brokerName="shared" useJmx="false" deleteAllMessagesOnStartup="false" xmlns="http://activemq.apache.org/schema/core"> + <transportConnectors> + <transportConnector uri="tcp://localhost:62002"/> + </transportConnectors> + + + </broker> + +</beans> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/BrokerViewSlowStoreStartupTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/BrokerViewSlowStoreStartupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/BrokerViewSlowStoreStartupTest.java new file mode 100644 index 0000000..6910bed --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/BrokerViewSlowStoreStartupTest.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import static org.junit.Assert.*; + +import java.io.File; +import java.util.NoSuchElementException; +import java.util.concurrent.CountDownLatch; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used to verify that the BrokerView accessed while the BrokerSerivce is waiting + * for a Slow Store startup to complete doesn't throw unexpected NullPointerExceptions. + */ +public class BrokerViewSlowStoreStartupTest { + + private static final Logger LOG = LoggerFactory.getLogger(BrokerViewSlowStoreStartupTest.class); + + private final CountDownLatch holdStoreStart = new CountDownLatch(1); + private final String brokerName = "brokerViewTest"; + + private BrokerService broker; + private Thread startThread; + + private BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName(brokerName); + + KahaDBStore kaha = new KahaDBStore() { + + @Override + public void start() throws Exception { + LOG.info("Test KahaDB class is waiting for signal to complete its start()"); + holdStoreStart.await(); + super.start(); + LOG.info("Test KahaDB class is completed its start()"); + } + }; + + kaha.setDirectory(new File("target/activemq-data/kahadb")); + kaha.deleteAllMessages(); + + broker.setPersistenceAdapter(kaha); + broker.setUseJmx(true); + + return broker; + } + + @Before + public void setUp() throws Exception { + broker = createBroker(); + + startThread = new Thread(new Runnable() { + + @Override + public void run() { + try { + broker.start(); + } catch(Exception e) { + e.printStackTrace(); + } + } + }); + startThread.start(); + } + + @After + public void tearDown() throws Exception { + + // ensure we don't keep the broker held if an exception occurs somewhere. + holdStoreStart.countDown(); + + startThread.join(); + + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test(timeout=120000) + public void testBrokerViewOnSlowStoreStart() throws Exception { + + // Ensure we have an Admin View. + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return (broker.getAdminView()) != null; + } + })); + + final BrokerView view = broker.getAdminView(); + + try { + view.getBrokerName(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getBrokerId(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTotalEnqueueCount(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTotalDequeueCount(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTotalConsumerCount(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTotalProducerCount(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTotalMessageCount(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTotalMessagesCached(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.resetStatistics(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.enableStatistics(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.disableStatistics(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.isStatisticsEnabled(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTopics(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getQueues(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTemporaryTopics(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTemporaryQueues(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTopicSubscribers(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getDurableTopicSubscribers(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getQueueSubscribers(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTemporaryTopicSubscribers(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTemporaryQueueSubscribers(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getInactiveDurableTopicSubscribers(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTopicProducers(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getQueueProducers(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTemporaryTopicProducers(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getTemporaryQueueProducers(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.getDynamicDestinationProducers(); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.removeConnector("tcp"); + fail("Should have thrown an NoSuchElementException"); + } catch(NoSuchElementException e) { + } + + try { + view.removeNetworkConnector("tcp"); + fail("Should have thrown an NoSuchElementException"); + } catch(NoSuchElementException e) { + } + + try { + view.addTopic("TEST"); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.addQueue("TEST"); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.removeTopic("TEST"); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.removeQueue("TEST"); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.createDurableSubscriber("1", "2", "3","4"); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + try { + view.destroyDurableSubscriber("1", "2"); + fail("Should have thrown an IllegalStateException"); + } catch(IllegalStateException e) { + } + + holdStoreStart.countDown(); + startThread.join(); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return view.getBroker() != null; + } + }); + assertNotNull(view.getBroker()); + + try { + view.getBrokerName(); + } catch(Exception e) { + fail("caught an exception getting the Broker property: " + e.getClass().getName()); + } + + try { + view.getBrokerId(); + } catch(IllegalStateException e) { + fail("caught an exception getting the Broker property: " + e.getClass().getName()); + } + + try { + view.getTotalEnqueueCount(); + } catch(IllegalStateException e) { + fail("caught an exception getting the Broker property: " + e.getClass().getName()); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java new file mode 100644 index 0000000..9998be9 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import java.util.List; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HealthViewMBeanTest extends EmbeddedBrokerTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(MBeanTest.class); + protected MBeanServer mbeanServer; + protected String domain = "org.apache.activemq"; + + @Override + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:0"; + useTopic = false; + super.setUp(); + mbeanServer = broker.getManagementContext().getMBeanServer(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(true); + answer.setDeleteAllMessagesOnStartup(true); + answer.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 64); + answer.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 64); + answer.getSystemUsage().getStoreUsage().setLimit(1024 * 1024 * 64); + answer.setUseJmx(true); + answer.setSchedulerSupport(true); + + // allow options to be visible via jmx + + answer.addConnector(bindAddress); + return answer; + } + + public void testHealthView() throws Exception{ + Connection connection = connectionFactory.createConnection(); + + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + for (int i = 0; i < 60; i++) { + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[1024 *1024]); + producer.send(message); + } + + Thread.sleep(1000); + + String objectNameStr = broker.getBrokerObjectName().toString(); + objectNameStr += ",service=Health"; + ObjectName brokerName = assertRegisteredObjectName(objectNameStr); + HealthViewMBean health = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, HealthViewMBean.class, true); + List<HealthStatus> list = health.healthList(); + + for (HealthStatus status : list) { + LOG.info("Health status: {}", status); + } + + assertEquals(2, list.size()); + } + + protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException { + ObjectName objectName = new ObjectName(name); + if (mbeanServer.isRegistered(objectName)) { + LOG.info("Bean Registered: " + objectName); + } else { + fail("Could not find MBean!: " + objectName); + } + return objectName; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/Log4JConfigTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/Log4JConfigTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/Log4JConfigTest.java new file mode 100644 index 0000000..2c2b373 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/Log4JConfigTest.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import java.util.List; + +import javax.jms.ConnectionFactory; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +public class Log4JConfigTest extends EmbeddedBrokerTestSupport { + + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(Log4JConfigTest.class); + + private static final String BROKER_LOGGER = "org.apache.activemq.broker.BrokerService"; + + protected MBeanServer mbeanServer; + protected String domain = "org.apache.activemq"; + + @Override + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:0"; + useTopic = false; + super.setUp(); + mbeanServer = broker.getManagementContext().getMBeanServer(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(true); + answer.setDeleteAllMessagesOnStartup(true); + answer.setUseJmx(true); + answer.setSchedulerSupport(true); + answer.addConnector(bindAddress); + return answer; + } + + @Test + public void testLog4JConfigViewExists() throws Exception { + String brokerObjectName = broker.getBrokerObjectName().toString(); + String log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName).toString(); + assertRegisteredObjectName(log4jConfigViewName); + } + + @Test + public void testLog4JConfigViewGetLoggers() throws Throwable { + String brokerObjectName = broker.getBrokerObjectName().toString(); + ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName); + Log4JConfigViewMBean log4jConfigView = MBeanServerInvocationHandler.newProxyInstance( + mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true); + + List<String> loggers = log4jConfigView.getLoggers(); + assertNotNull(loggers); + assertFalse(loggers.isEmpty()); + } + + @Test + public void testLog4JConfigViewGetLevel() throws Throwable { + String brokerObjectName = broker.getBrokerObjectName().toString(); + ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName); + Log4JConfigViewMBean log4jConfigView = MBeanServerInvocationHandler.newProxyInstance( + mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true); + + String level = log4jConfigView.getLogLevel(BROKER_LOGGER); + assertNotNull(level); + assertFalse(level.isEmpty()); + } + + @Test + public void testLog4JConfigViewGetLevelUnknownLoggerName() throws Throwable { + String brokerObjectName = broker.getBrokerObjectName().toString(); + ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName); + Log4JConfigViewMBean log4jConfigView = MBeanServerInvocationHandler.newProxyInstance( + mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true); + + // Non-existent loggers will return a name equal to the root level. + String level = log4jConfigView.getLogLevel("not.a.logger"); + assertNotNull(level); + assertFalse(level.isEmpty()); + assertEquals(Logger.getRootLogger().getLevel().toString(), level); + } + + @Test + public void testLog4JConfigViewSetLevel() throws Throwable { + String brokerObjectName = broker.getBrokerObjectName().toString(); + ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName); + Log4JConfigViewMBean log4jConfigView = MBeanServerInvocationHandler.newProxyInstance( + mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true); + + String level = log4jConfigView.getLogLevel(BROKER_LOGGER); + assertNotNull(level); + assertFalse(level.isEmpty()); + + log4jConfigView.setLogLevel(BROKER_LOGGER, "WARN"); + level = log4jConfigView.getLogLevel(BROKER_LOGGER); + assertNotNull(level); + assertEquals("WARN", level); + + log4jConfigView.setLogLevel(BROKER_LOGGER, "INFO"); + level = log4jConfigView.getLogLevel(BROKER_LOGGER); + assertNotNull(level); + assertEquals("INFO", level); + } + + @Test + public void testLog4JConfigViewSetLevelNoChangeIfLevelIsBad() throws Throwable { + String brokerObjectName = broker.getBrokerObjectName().toString(); + ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName); + Log4JConfigViewMBean log4jConfigView = MBeanServerInvocationHandler.newProxyInstance( + mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true); + + log4jConfigView.setLogLevel(BROKER_LOGGER, "INFO"); + String level = log4jConfigView.getLogLevel(BROKER_LOGGER); + assertNotNull(level); + assertEquals("INFO", level); + + log4jConfigView.setLogLevel(BROKER_LOGGER, "BAD"); + level = log4jConfigView.getLogLevel(BROKER_LOGGER); + assertNotNull(level); + assertEquals("INFO", level); + } + + @Test + public void testLog4JConfigViewGetRootLogLevel() throws Throwable { + String brokerObjectName = broker.getBrokerObjectName().toString(); + ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName); + Log4JConfigViewMBean log4jConfigView = MBeanServerInvocationHandler.newProxyInstance( + mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true); + + String level = log4jConfigView.getRootLogLevel(); + assertNotNull(level); + assertFalse(level.isEmpty()); + + String currentRootLevel = Logger.getRootLogger().getLevel().toString(); + assertEquals(currentRootLevel, level); + } + + @Test + public void testLog4JConfigViewSetRootLevel() throws Throwable { + String brokerObjectName = broker.getBrokerObjectName().toString(); + ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName); + Log4JConfigViewMBean log4jConfigView = MBeanServerInvocationHandler.newProxyInstance( + mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true); + + String currentRootLevel = Logger.getRootLogger().getLevel().toString(); + log4jConfigView.setRootLogLevel("WARN"); + currentRootLevel = Logger.getRootLogger().getLevel().toString(); + assertEquals("WARN", currentRootLevel); + log4jConfigView.setRootLogLevel("INFO"); + currentRootLevel = Logger.getRootLogger().getLevel().toString(); + assertEquals("INFO", currentRootLevel); + + Level level; + } + + protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException { + ObjectName objectName = new ObjectName(name); + if (mbeanServer.isRegistered(objectName)) { + LOG.info("Bean Registered: " + objectName); + } else { + fail("Could not find MBean!: " + objectName); + } + return objectName; + } +}
