http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/test/TestSupport.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client-jms/src/test/java/org/apache/activemq/test/TestSupport.java 
b/hedwig-client-jms/src/test/java/org/apache/activemq/test/TestSupport.java
deleted file mode 100644
index 6de9021..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/test/TestSupport.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.test;
-
-import javax.jms.Topic;
-import java.io.File;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.SessionImpl;
-import java.lang.reflect.Array;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.hedwig.jms.message.MessageImpl;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Useful base class for unit test cases
- */
-public abstract class TestSupport extends JmsTestBase {
-    private static final Logger LOG = 
LoggerFactory.getLogger(TestSupport.class);
-
-    protected HedwigConnectionFactoryImpl connectionFactory;
-    protected boolean topic = true;
-
-    public TestSupport() {
-        super();
-    }
-
-    public TestSupport(String name) {
-        super(name);
-    }
-
-    /**
-     * Creates an MessageImpl.
-     *
-     * @return MessageImpl
-     */
-    protected MessageImpl createMessage() {
-        return new MessageImpl(null);
-    }
-
-    /**
-     * Creates a destination.
-     *
-     * @param subject - topic or queue name.
-     * @return Destination - either an Topic or ActiveMQQUeue.
-     */
-    protected Destination createDestination(String subject) {
-        if (topic) {
-            return SessionImpl.asTopic(subject);
-        } else {
-            return SessionImpl.asTopic(subject);
-        }
-    }
-
-    /**
-     * Tests if firstSet and secondSet are equal.
-     *
-     * @param messsage - string to be displayed when the assertion fails.
-     * @param firstSet[] - set of messages to be compared with its counterpart
-     *                in the secondset.
-     * @param secondSet[] - set of messages to be compared with its counterpart
-     *                in the firstset.
-     * @throws JMSException
-     */
-    protected void assertTextMessagesEqual(Message[] firstSet, Message[] 
secondSet) throws JMSException {
-        assertTextMessagesEqual("", firstSet, secondSet);
-    }
-
-    /**
-     * Tests if firstSet and secondSet are equal.
-     *
-     * @param messsage - string to be displayed when the assertion fails.
-     * @param firstSet[] - set of messages to be compared with its counterpart
-     *                in the secondset.
-     * @param secondSet[] - set of messages to be compared with its counterpart
-     *                in the firstset.
-     */
-    protected void assertTextMessagesEqual(String messsage, Message[] firstSet,
-                                           Message[] secondSet) throws 
JMSException {
-        assertEquals("Message count does not match: " + messsage, 
firstSet.length, secondSet.length);
-
-        for (int i = 0; i < secondSet.length; i++) {
-            TextMessage m1 = (TextMessage)firstSet[i];
-            TextMessage m2 = (TextMessage)secondSet[i];
-            assertTextMessageEqual("Message " + (i + 1) + " did not match : ", 
m1, m2);
-        }
-    }
-
-    /**
-     * Tests if m1 and m2 are equal.
-     *
-     * @param m1 - message to be compared with m2.
-     * @param m2 - message to be compared with m1.
-     * @throws JMSException
-     */
-    protected void assertEquals(TextMessage m1, TextMessage m2) throws 
JMSException {
-        assertEquals("", m1, m2);
-    }
-
-    /**
-     * Tests if m1 and m2 are equal.
-     *
-     * @param message - string to be displayed when the assertion fails.
-     * @param m1 - message to be compared with m2.
-     * @param m2 - message to be compared with m1.
-     */
-    protected void assertTextMessageEqual(String message, TextMessage m1, 
TextMessage m2) throws JMSException {
-        assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", 
m1 == null ^ m2 == null);
-
-        if (m1 == null) {
-            return;
-        }
-
-        assertEquals(message, m1.getText(), m2.getText());
-    }
-
-    /**
-     * Tests if m1 and m2 are equal.
-     *
-     * @param m1 - message to be compared with m2.
-     * @param m2 - message to be compared with m1.
-     * @throws JMSException
-     */
-    protected void assertEquals(Message m1, Message m2) throws JMSException {
-        assertEquals("", m1, m2);
-    }
-
-    /**
-     * Tests if m1 and m2 are equal.
-     *
-     * @param message - error message.
-     * @param m1 - message to be compared with m2.
-     * @param m2 -- message to be compared with m1.
-     */
-    protected void assertEquals(String message, Message m1, Message m2) throws 
JMSException {
-        assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", 
m1 == null ^ m2 == null);
-
-        if (m1 == null) {
-            return;
-        }
-
-        assertTrue(message + ": expected {" + m1 + "}, but was {" + m2 + "}", 
m1.getClass() == m2.getClass());
-
-        if (m1 instanceof TextMessage) {
-            assertTextMessageEqual(message, (TextMessage)m1, (TextMessage)m2);
-        } else {
-            assertEquals(message, m1, m2);
-        }
-    }
-
-    /**
-     * Test if base directory contains spaces
-     */
-    protected void assertBaseDirectoryContainsSpaces() {
-        assertFalse("Base directory cannot contain spaces.",
-                    new File(System.getProperty("basedir", 
".")).getAbsoluteFile().toString().contains(" "));
-    }
-
-    /**
-     * Creates an HedwigConnectionFactoryImpl.
-     *
-     * @return HedwigConnectionFactoryImpl
-     * @throws Exception
-     */
-    protected HedwigConnectionFactoryImpl createConnectionFactory() throws 
Exception {
-        return new HedwigConnectionFactoryImpl();
-    }
-
-    /**
-     * Factory method to create a new connection.
-     *
-     * @return connection
-     * @throws Exception
-     */
-    protected Connection createConnection() throws Exception {
-        return getConnectionFactory().createConnection();
-    }
-
-    /**
-     * Creates an ActiveMQ connection factory.
-     *
-     * @return connectionFactory
-     * @throws Exception
-     */
-    public HedwigConnectionFactoryImpl getConnectionFactory() throws Exception 
{
-        if (connectionFactory == null) {
-            connectionFactory = createConnectionFactory();
-            assertTrue("Should have created a connection factory!", 
connectionFactory != null);
-        }
-
-        return connectionFactory;
-    }
-
-    /**
-     * Returns the consumer subject.
-     *
-     * @return String
-     */
-    protected String getConsumerSubject() {
-        return getSubject();
-    }
-
-    /**
-     * Returns the producer subject.
-     *
-     * @return String
-     */
-    protected String getProducerSubject() {
-        return getSubject();
-    }
-
-    /**
-     * Returns the subject.
-     *
-     * @return String
-     */
-    protected String getSubject() {
-        return getClass().getName() + "." + getName();
-    }
-
-    protected void assertArrayEqual(String message, Object[] expected, 
Object[] actual) {
-        assertEquals(message + ". Array length", expected.length, 
actual.length);
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(message + ". element: " + i, expected[i], actual[i]);
-        }
-    }
-
-    protected void assertPrimitiveArrayEqual(String message, Object expected, 
Object actual) {
-        int length = Array.getLength(expected);
-        assertEquals(message + ". Array length", length, 
Array.getLength(actual));
-        for (int i = 0; i < length; i++) {
-            assertEquals(message + ". element: " + i, Array.get(expected, i), 
Array.get(actual, i));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client-jms/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
 
b/hedwig-client-jms/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
deleted file mode 100644
index 0bc8a8e..0000000
--- 
a/hedwig-client-jms/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.test.rollback;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DelegatingTransactionalMessageListener implements MessageListener 
{
-    private static final transient Logger LOG = 
LoggerFactory.getLogger(DelegatingTransactionalMessageListener.class);
-
-    private final MessageListener underlyingListener;
-    private boolean transacted = true;
-    private int ackMode = Session.AUTO_ACKNOWLEDGE;
-    private Session session;
-
-    public DelegatingTransactionalMessageListener(MessageListener 
underlyingListener,
-                                                  Connection connection, 
Destination destination) {
-        this.underlyingListener = underlyingListener;
-
-        try {
-            session = connection.createSession(transacted, ackMode);
-            MessageConsumer consumer = session.createConsumer(destination);
-            consumer.setMessageListener(this);
-        } catch (JMSException e) {
-            throw new IllegalStateException("Could not listen to " + 
destination, e);
-        }
-    }
-
-    public void onMessage(Message message) {
-        try {
-            underlyingListener.onMessage(message);
-            session.commit();
-        } catch (Throwable e) {
-            rollback();
-        }
-    }
-
-    private void rollback() {
-        try {
-            session.rollback();
-        } catch (JMSException e) {
-            LOG.error("Failed to rollback: " + e, e);
-        }
-    }
-
-    public Session getSession() {
-        return session;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSentMessageTest.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSentMessageTest.java
 
b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSentMessageTest.java
deleted file mode 100644
index 5bc1a07..0000000
--- 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSentMessageTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.usecases;
-
-import java.util.HashMap;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-
-import org.apache.activemq.test.TestSupport;
-
-public class ChangeSentMessageTest extends TestSupport {
-    private static final int COUNT = 200;
-    private static final String VALUE_NAME = "value";
-
-    /**
-     * test Object messages can be changed after sending with no side-affects
-     *
-     * @throws Exception
-     */
-    public void testDoChangeSentMessage() throws Exception {
-        Destination destination = createDestination("test-" + 
ChangeSentMessageTest.class.getName());
-        Connection connection = createConnection();
-        connection.start();
-        Session consumerSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = consumerSession.createConsumer(destination);
-        Session publisherSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = 
publisherSession.createProducer(destination);
-        HashMap<String, Integer> map = new HashMap<String, Integer>();
-        ObjectMessage message = publisherSession.createObjectMessage();
-        for (int i = 0; i < COUNT; i++) {
-            map.put(VALUE_NAME, Integer.valueOf(i));
-            message.setObject(map);
-            producer.send(message);
-            assertTrue(message.getObject() == map);
-        }
-        for (int i = 0; i < COUNT; i++) {
-            ObjectMessage msg = (ObjectMessage)consumer.receive();
-            HashMap receivedMap = (HashMap)msg.getObject();
-            Integer intValue = (Integer)receivedMap.get(VALUE_NAME);
-            assertTrue(intValue.intValue() == i);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSessionDeliveryModeTest.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSessionDeliveryModeTest.java
 
b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSessionDeliveryModeTest.java
deleted file mode 100644
index b4da7a7..0000000
--- 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSessionDeliveryModeTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.usecases;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.apache.activemq.test.TestSupport;
-
-public class ChangeSessionDeliveryModeTest extends TestSupport implements 
MessageListener {
-
-    /**
-     * test following condition- which are defined by JMS Spec 1.1:
-     * MessageConsumers cannot use a MessageListener and receive() from the 
same
-     * session
-     *
-     * @throws Exception
-     */
-    public void testDoChangeSessionDeliveryMode() throws Exception {
-        Destination destination = createDestination("foo.bar");
-        Connection connection = createConnection();
-        connection.start();
-        Session consumerSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-        /*
-        MessageConsumer consumer1 = 
consumerSession.createConsumer(destination);
-        consumer1.setMessageListener(this);
-        JMSException jmsEx = null;
-        MessageConsumer consumer2 = 
consumerSession.createConsumer(destination);
-
-        try {
-            consumer2.receive(10);
-            fail("Did not receive expected exception.");
-        } catch (JMSException e) {
-            assertTrue(e instanceof IllegalStateException);
-        }
-        */
-        MessageConsumer consumer1 = 
consumerSession.createConsumer(destination);
-        consumer1.setMessageListener(this);
-
-        try {
-            consumer1.receive(10);
-            fail("Did not receive expected exception.");
-        } catch (JMSException e) {
-            assertTrue(e instanceof IllegalStateException);
-        }
-    }
-
-    public void onMessage(Message msg) {
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositeConsumeTest.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositeConsumeTest.java
 
b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositeConsumeTest.java
deleted file mode 100644
index 7ea8cc0..0000000
--- 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositeConsumeTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Destination;
-import javax.jms.Message;
-
-
-import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest;
-
-import org.apache.hedwig.jms.SessionImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CompositeConsumeTest extends 
JmsTopicSendReceiveWithTwoConnectionsTest {
-    private static final Logger LOG = 
LoggerFactory.getLogger(CompositeConsumeTest.class);
-
-    public void testSendReceive() throws Exception {
-        messages.clear();
-
-        Destination[] destinations = getDestinations();
-        int destIdx = 0;
-
-        for (int i = 0; i < data.length; i++) {
-            Message message = session.createTextMessage(data[i]);
-
-            if (verbose) {
-                LOG.info("About to send a message: " + message + " with text: 
" + data[i]);
-            }
-
-            producer.send(destinations[destIdx], message);
-
-            if (++destIdx >= destinations.length) {
-                destIdx = 0;
-            }
-        }
-
-        assertMessagesAreReceived();
-    }
-
-    /**
-     * Returns the subscription subject
-     */
-    protected String getSubject() {
-        // return getPrefix() + "FOO.BAR," + getPrefix() + "FOO.X.Y," + 
getPrefix() + "BAR.>";
-        return getPrefix() + "FOO.BAR";
-    }
-
-    /**
-     * Returns the destinations on which we publish
-     */
-    protected Destination[] getDestinations() {
-        return new Destination[]{SessionImpl.asTopic(getSubject())};
-    }
-
-    protected String getPrefix() {
-        return super.getSubject() + ".";
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java
 
b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java
deleted file mode 100644
index 7833e01..0000000
--- 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-
-import org.apache.activemq.test.JmsSendReceiveTestSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CompositePublishTest extends JmsSendReceiveTestSupport {
-    private static final Logger LOG = 
LoggerFactory.getLogger(CompositePublishTest.class);
-
-    protected Connection sendConnection;
-    protected Connection receiveConnection;
-    protected Session receiveSession;
-    protected MessageConsumer[] consumers;
-    protected List[] messageLists;
-
-    @SuppressWarnings("unchecked")
-    protected void setUp() throws Exception {
-        super.setUp();
-
-        connectionFactory = createConnectionFactory();
-
-        sendConnection = createConnection(false);
-        sendConnection.start();
-
-        receiveConnection = createConnection(false);
-        receiveConnection.start();
-
-        LOG.info("Created sendConnection: " + sendConnection);
-        LOG.info("Created receiveConnection: " + receiveConnection);
-
-        session = sendConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        receiveSession = receiveConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-        LOG.info("Created sendSession: " + session);
-        LOG.info("Created receiveSession: " + receiveSession);
-
-        producer = session.createProducer(null);
-
-        LOG.info("Created producer: " + producer);
-
-        consumerDestination = session.createTopic(getConsumerSubject());
-        producerDestination = session.createTopic(getProducerSubject());
-
-        LOG.info("Created  consumer destination: " + consumerDestination
-                 + " of type: " + consumerDestination.getClass());
-        LOG.info("Created  producer destination: " + producerDestination
-                 + " of type: " + producerDestination.getClass());
-
-        Destination[] destinations = getDestinations();
-        consumers = new MessageConsumer[destinations.length];
-        messageLists = new List[destinations.length];
-        for (int i = 0; i < destinations.length; i++) {
-            Destination dest = destinations[i];
-            messageLists[i] = createConcurrentList();
-            consumers[i] = receiveSession.createConsumer(dest);
-            consumers[i].setMessageListener(createMessageListener(i, 
messageLists[i]));
-        }
-
-        LOG.info("Started connections");
-    }
-
-    protected MessageListener createMessageListener(int i, final List<Message> 
messageList) {
-        return new MessageListener() {
-            public void onMessage(Message message) {
-                consumeMessage(message, messageList);
-            }
-        };
-    }
-
-    /**
-     * Returns the subject on which we publish
-     */
-    protected String getSubject() {
-        // return getPrefix() + "FOO.BAR," + getPrefix() + "FOO.X.Y";
-        return getPrefix() + "FOO.BAR";
-    }
-
-    /**
-     * Returns the destinations to which we consume
-     */
-    protected Destination[] getDestinations() {
-        // return new Destination[] {SessionImpl.asTopic(getPrefix() + 
"FOO.BAR"),
-        // SessionImpl.asTopic(getPrefix() + "FOO.*"), 
SessionImpl.asTopic(getPrefix() + "FOO.X.Y")};
-        return new Destination[] {SessionImpl.asTopic(getPrefix() + 
"FOO.BAR")};
-    }
-
-    protected String getPrefix() {
-        return super.getSubject() + ".";
-    }
-
-    @SuppressWarnings("unchecked")
-    protected void assertMessagesAreReceived() throws JMSException {
-        waitForMessagesToBeDelivered();
-        int size = messageLists.length;
-        for (int i = 0; i < size; i++) {
-            LOG.info("Message list: " + i + " contains: " + 
messageLists[i].size() + " message(s)");
-        }
-        size = messageLists.length;
-        for (int i = 0; i < size; i++) {
-            assertMessagesReceivedAreValid(messageLists[i]);
-        }
-    }
-
-    protected HedwigConnectionFactoryImpl createConnectionFactory() {
-        return new HedwigConnectionFactoryImpl();
-    }
-
-    protected void tearDown() throws Exception {
-        session.close();
-        receiveSession.close();
-
-        sendConnection.close();
-        receiveConnection.close();
-        super.tearDown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
 
b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
deleted file mode 100644
index 087c5ef..0000000
--- 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.TopicSubscriber;
-import junit.framework.Test;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.activemq.TestSupport;
-
-import org.apache.activemq.util.MessageIdList;
-import org.apache.activemq.util.Wait;
-//import org.apache.commons.dbcp.BasicDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConcurrentProducerDurableConsumerTest extends TestSupport {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ConcurrentProducerDurableConsumerTest.class);
-    private int consumerCount = 5;
-    protected List<Connection> connections = Collections.synchronizedList(new 
ArrayList<Connection>());
-    protected Map<MessageConsumer, TimedMessageListener> consumers
-        = new HashMap<MessageConsumer, TimedMessageListener>();
-    protected MessageIdList allMessagesList = new MessageIdList();
-    private int messageSize = 1024;
-
-    public void testSendRateWithActivatingConsumers() throws Exception {
-        final Destination destination = createDestination();
-        final ConnectionFactory factory = createConnectionFactory();
-        startInactiveConsumers(factory, destination);
-
-        Connection connection = factory.createConnection();
-        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = createMessageProducer(session, destination);
-
-        // preload the durable consumers
-        double[] inactiveConsumerStats = produceMessages(destination, 500, 10, 
session, producer, null);
-        LOG.info("With inactive consumers: ave: " + inactiveConsumerStats[1]
-                + ", max: " + inactiveConsumerStats[0] + ", multiplier: "
-                 + (inactiveConsumerStats[0]/inactiveConsumerStats[1]));
-
-        // periodically start a durable sub that has a backlog
-        final int consumersToActivate = 5;
-        final Object addConsumerSignal = new Object();
-        Executors.newCachedThreadPool(new ThreadFactory() {
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, "ActivateConsumer" + this);
-            }
-        }).execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    MessageConsumer consumer = null;
-                    for (int i = 0; i < consumersToActivate; i++) {
-                        LOG.info("Waiting for add signal from producer...");
-                        synchronized (addConsumerSignal) {
-                            addConsumerSignal.wait(30 * 60 * 1000);
-                        }
-                        TimedMessageListener listener = new 
TimedMessageListener();
-                        consumer = 
createDurableSubscriber(factory.createConnection(),
-                                                           destination, 
"consumer" + (i + 1));
-                        LOG.info("Created consumer " + consumer);
-                        consumer.setMessageListener(listener);
-                        consumers.put(consumer, listener);
-                    }
-                } catch (Exception e) {
-                    LOG.error("failed to start consumer", e);
-                }
-            }
-        });
-
-
-        double[] statsWithActive = produceMessages(destination, 500, 10, 
session, producer, addConsumerSignal);
-
-        LOG.info(" with concurrent activate, ave: " + statsWithActive[1]
-                 + ", max: " + statsWithActive[0] + ", multiplier: " + 
(statsWithActive[0]/ statsWithActive[1]));
-
-        while(consumers.size() < consumersToActivate) {
-            TimeUnit.SECONDS.sleep(2);
-        }
-
-        long timeToFirstAccumulator = 0;
-        for (TimedMessageListener listener : consumers.values()) {
-            long time = listener.getFirstReceipt();
-            timeToFirstAccumulator += time;
-            LOG.info("Time to first " + time);
-        }
-        LOG.info("Ave time to first message =" + 
timeToFirstAccumulator/consumers.size());
-
-        for (TimedMessageListener listener : consumers.values()) {
-            LOG.info("Ave batch receipt time: " + 
listener.waitForReceivedLimit(10000)
-                     + " max receipt: " + listener.maxReceiptTime);
-        }
-
-        //assertTrue("max (" + statsWithActive[0] + ") within reasonable
-        // multiplier of ave (" + statsWithActive[1] + ")",
-        //        statsWithActive[0] < 5 * statsWithActive[1]);
-
-        // compare no active to active
-        LOG.info("Ave send time with active: " + statsWithActive[1]
-                + " as multiplier of ave with none active: " + 
inactiveConsumerStats[1]
-                + ", multiplier=" + 
(statsWithActive[1]/inactiveConsumerStats[1]));
-
-        assertTrue("Ave send time with active: " + statsWithActive[1]
-                + " within reasonable multpler of ave with none active: " + 
inactiveConsumerStats[1]
-                + ", multiplier " + 
(statsWithActive[1]/inactiveConsumerStats[1]),
-                statsWithActive[1] < 15 * inactiveConsumerStats[1]);
-    }
-
-
-    public void x_testSendWithInactiveAndActiveConsumers() throws Exception {
-        Destination destination = createDestination();
-        ConnectionFactory factory = createConnectionFactory();
-        startInactiveConsumers(factory, destination);
-
-        Connection connection = factory.createConnection();
-        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(destination);
-        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
-        final int toSend = 100;
-        final int numIterations = 5;
-
-        double[] noConsumerStats = produceMessages(destination, toSend, 
numIterations, session, producer, null);
-
-        startConsumers(factory, destination);
-        LOG.info("Activated consumer");
-
-        double[] withConsumerStats = produceMessages(destination, toSend, 
numIterations, session, producer, null);
-
-        LOG.info("With consumer: " + withConsumerStats[1] + " , with 
noConsumer: " + noConsumerStats[1]
-                + ", multiplier: " + 
(withConsumerStats[1]/noConsumerStats[1]));
-        final int reasonableMultiplier = 15; // not so reasonable but improving
-        assertTrue("max X times as slow with consumer: " + 
withConsumerStats[1] + ", with no Consumer: "
-                + noConsumerStats[1] + ", multiplier: " + 
(withConsumerStats[1]/noConsumerStats[1]),
-                withConsumerStats[1] < noConsumerStats[1] * 
reasonableMultiplier);
-
-        final int toReceive = toSend * numIterations * consumerCount * 2;
-        Wait.waitFor(new Wait.Condition() {
-            public boolean isSatisified() throws Exception {
-                LOG.info("count: " + allMessagesList.getMessageCount());
-                return toReceive == allMessagesList.getMessageCount();
-            }
-        }, 60 * 1000);
-
-        assertEquals("got all messages", toReceive, 
allMessagesList.getMessageCount());
-    }
-
-
-    private MessageProducer createMessageProducer(Session session, Destination 
destination) throws JMSException {
-        MessageProducer producer = session.createProducer(destination);
-        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-        return producer;
-    }
-
-
-    private void startInactiveConsumers(ConnectionFactory factory, Destination 
destination) throws Exception {
-        // create off line consumers
-        startConsumers(factory, destination);
-        for (Connection connection: connections) {
-            connection.close();
-        }
-        connections.clear();
-        consumers.clear();
-    }
-
-
-    protected void startConsumers(ConnectionFactory factory, Destination dest) 
throws Exception {
-        MessageConsumer consumer;
-        for (int i = 0; i < consumerCount; i++) {
-            TimedMessageListener list = new TimedMessageListener();
-            consumer = createDurableSubscriber(factory.createConnection(), 
dest, "consumer" + (i + 1));
-            consumer.setMessageListener(list);
-            consumers.put(consumer, list);
-        }
-    }
-
-    protected TopicSubscriber createDurableSubscriber(Connection conn,
-                                                      Destination dest, String 
name) throws Exception {
-        conn.setClientID(name);
-        connections.add(conn);
-        conn.start();
-
-        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        final TopicSubscriber consumer = 
sess.createDurableSubscriber((javax.jms.Topic)dest, name);
-
-        return consumer;
-    }
-
-    /**
-     * @return max and ave send time
-     * @throws Exception
-     */
-    private double[] produceMessages(Destination destination,
-                                     final int toSend,
-                                     final int numIterations,
-                                     Session session,
-                                     MessageProducer producer,
-                                     Object addConsumerSignal) throws 
Exception {
-        long start;
-        long count = 0;
-        double batchMax = 0, max = 0, sum = 0;
-        for (int i=0; i<numIterations; i++) {
-            start = System.currentTimeMillis();
-            for (int j=0; j < toSend; j++) {
-                long singleSendstart = System.currentTimeMillis();
-                TextMessage msg = createTextMessage(session, "" + j);
-                // rotate
-                int priority = ((int)count%10);
-                producer.send(msg, DeliveryMode.PERSISTENT, priority, 0);
-                max = Math.max(max, (System.currentTimeMillis() - 
singleSendstart));
-                if (++count % 500 == 0) {
-                    if (addConsumerSignal != null) {
-                        synchronized (addConsumerSignal) {
-                            addConsumerSignal.notifyAll();
-                            LOG.info("Signalled add consumer");
-                        }
-                    }
-                }
-                ;
-                if (count % 5000 == 0) {
-                    LOG.info("Sent " + count + ", singleSendMax:" + max);
-                }
-
-            }
-            long duration = System.currentTimeMillis() - start;
-            batchMax = Math.max(batchMax, duration);
-            sum += duration;
-            LOG.info("Iteration " + i + ", sent " + toSend + ", time: "
-                    + duration + ", batchMax:" + batchMax + ", singleSendMax:" 
+ max);
-        }
-
-        LOG.info("Sent: " + toSend * numIterations + ", batchMax: " + batchMax 
+ " singleSendMax: " + max);
-        return new double[]{batchMax, sum/numIterations};
-    }
-
-    protected TextMessage createTextMessage(Session session, String initText) 
throws Exception {
-        TextMessage msg = session.createTextMessage();
-
-        // Pad message text
-        if (initText.length() < messageSize) {
-            char[] data = new char[messageSize - initText.length()];
-            Arrays.fill(data, '*');
-            String str = new String(data);
-            msg.setText(initText + str);
-
-            // Do not pad message text
-        } else {
-            msg.setText(initText);
-        }
-
-        return msg;
-    }
-
-    @Override
-    protected void setUp() throws Exception {
-        topic = true;
-        super.setUp();
-    }
-
-    @Override
-    protected void tearDown() throws Exception {
-        for (Iterator<Connection> iter = connections.iterator(); 
iter.hasNext();) {
-            Connection conn = iter.next();
-            try {
-                conn.close();
-            } catch (Throwable e) {
-            }
-        }
-        allMessagesList.flushMessages();
-        consumers.clear();
-        super.tearDown();
-    }
-
-
-    protected HedwigConnectionFactoryImpl createConnectionFactory() throws 
Exception {
-        HedwigConnectionFactoryImpl factory = new 
HedwigConnectionFactoryImpl();
-        return factory;
-    }
-
-    public static Test suite() {
-        return suite(ConcurrentProducerDurableConsumerTest.class);
-    }
-
-    class TimedMessageListener implements MessageListener {
-        final int batchSize = 1000;
-        CountDownLatch firstReceiptLatch = new CountDownLatch(1);
-        long mark = System.currentTimeMillis();
-        long firstReceipt = 0l;
-        long receiptAccumulator = 0;
-        long batchReceiptAccumulator = 0;
-        long maxReceiptTime = 0;
-        AtomicLong count = new AtomicLong(0);
-        Map<Integer, MessageIdList> messageLists
-            = new ConcurrentHashMap<Integer, MessageIdList>(new 
HashMap<Integer, MessageIdList>());
-
-        @Override
-        public void onMessage(Message message) {
-            final long current = System.currentTimeMillis();
-            final long duration = current - mark;
-            receiptAccumulator += duration;
-            int priority = 0;
-            try {
-                priority = message.getJMSPriority();
-            } catch (JMSException ignored) {}
-            if (!messageLists.containsKey(priority)) {
-                messageLists.put(priority, new MessageIdList());
-            }
-            messageLists.get(priority).onMessage(message);
-            if (count.incrementAndGet() == 1) {
-                firstReceipt = duration;
-                firstReceiptLatch.countDown();
-                LOG.info("First receipt in " + firstReceipt + "ms");
-            } else if (count.get() % batchSize == 0) {
-                LOG.info("Consumed " + count.get() + " in "
-                         + batchReceiptAccumulator + "ms" + ", priority:" + 
priority);
-                batchReceiptAccumulator=0;
-            }
-            maxReceiptTime = Math.max(maxReceiptTime, duration);
-            receiptAccumulator += duration;
-            batchReceiptAccumulator += duration;
-            mark = current;
-        }
-
-        long getMessageCount() {
-            return count.get();
-        }
-
-        long getFirstReceipt() throws Exception {
-            firstReceiptLatch.await(30, TimeUnit.SECONDS);
-            return firstReceipt;
-        }
-
-        public long waitForReceivedLimit(long limit) throws Exception {
-            final long expiry = System.currentTimeMillis() + 30*60*1000;
-            while (count.get() < limit) {
-                if (System.currentTimeMillis() > expiry) {
-                    throw new RuntimeException("Expired waiting for X 
messages, " + limit);
-                }
-                TimeUnit.SECONDS.sleep(2);
-                String missing = findFirstMissingMessage();
-                if (missing != null) {
-                    LOG.info("first missing = " + missing);
-                    throw new RuntimeException("We have a missing message. " + 
missing);
-                }
-
-            }
-            return receiptAccumulator/(limit/batchSize);
-        }
-
-        private String findFirstMissingMessage() {
-            /*
-            MessageId current = new MessageId();
-            for (MessageIdList priorityList : messageLists.values()) {
-                MessageId previous = null;
-                for (String id : priorityList.getMessageIds()) {
-                    current.setValue(id);
-                    if (previous == null) {
-                        previous = current.copy();
-                    } else {
-                        if (current.getProducerSequenceId() - 1 != 
previous.getProducerSequenceId() &&
-                            current.getProducerSequenceId() - 10 !=  
previous.getProducerSequenceId()) {
-                                return "Missing next after: " + previous + ", 
got: " + current;
-                        } else {
-                            previous = current.copy();
-                        }
-                    }
-                }
-            }
-            return null;
-            */
-            return null;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
 
b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
deleted file mode 100644
index b14ef71..0000000
--- 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.activemq.JmsConnectionStartStopTest;
-
-
-
-
-/**
- * Test case intended to demonstrate delivery interruption to queue consumers 
when
- * a JMS selector leaves some messages on the queue (due to use of a JMS 
Selector)
- *
- * testNonDiscriminatingConsumer() demonstrates proper functionality for 
consumers that don't use
- * a selector to qualify their input.
- *
- * testDiscriminatingConsumer() demonstrates the failure condition in which 
delivery to the consumer
- * eventually halts.
- *
- * The expected behavior is for the delivery to the client to be maintained 
regardless of the depth
- * of the queue, particularly when the messages in the queue do not meet the 
selector criteria of the
- * client.
- *
- * https://issues.apache.org/activemq/browse/AMQ-2217
- *
- */
-public class DiscriminatingConsumerLoadTest extends TestSupport {
-
-    private static final org.apache.commons.logging.Log LOG = 
org.apache.commons.logging.LogFactory
-        .getLog(DiscriminatingConsumerLoadTest.class);
-
-    private Connection producerConnection;
-    private Connection consumerConnection;
-    private int counterSent = 0;
-    private int counterReceived = 0;
-
-    public static final String JMSTYPE_EATME  = 
"DiscriminatingLoadClient.EatMe";
-    public static final String JMSTYPE_IGNOREME = 
"DiscriminatingLoadClient.IgnoreMe";
-
-    private int testSize = 5000; // setting this to a small number will pass 
all tests
-
-
-    protected void setUp() throws Exception {
-        super.setUp();
-        this.producerConnection = this.createConnection();
-        this.consumerConnection = this.createConnection();
-    }
-
-    /**
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
-        if (producerConnection != null) {
-            producerConnection.close();
-            producerConnection = null;
-        }
-        if (consumerConnection != null) {
-            consumerConnection.close();
-            consumerConnection = null;
-        }
-        super.tearDown();
-    }
-
-    /**
-     * Test to check if a single consumer with no JMS selector will receive 
all intended messages
-     *
-     * @throws java.lang.Exception
-     */
-    public void testNonDiscriminatingConsumer() throws Exception {
-        consumerConnection = createConnection();
-        consumerConnection.start();
-        LOG.info("consumerConnection = " +consumerConnection);
-
-        try {Thread.sleep(1000); } catch (Exception e) {}
-
-        // here we pass in null for the JMS selector
-        Consumer consumer = new Consumer(consumerConnection, null);
-        Thread consumerThread = new Thread(consumer);
-
-        consumerThread.start();
-
-        producerConnection = createConnection();
-        producerConnection.start();
-        LOG.info("producerConnection = " +producerConnection);
-
-        try {Thread.sleep(3000); } catch (Exception e) {}
-
-        Producer producer = new Producer(producerConnection);
-        Thread producerThread = new Thread(producer);
-        producerThread.start();
-
-        // now that everything is running, let's wait for the consumer thread 
to finish ...
-        consumerThread.join();
-        producer.stop = true;
-
-        if (consumer.getCount() == testSize )
-            LOG.info("test complete .... all messsages consumed!!");
-        else
-            LOG.info("test failed .... Sent " + (testSize / 1) +
-                     " messages intended to be consumed ( " + testSize
-                     + " total), but only consumed " + consumer.getCount());
-
-
-        assertTrue("Sent " + testSize + " messages intended to be consumed, 
but only consumed " + consumer.getCount(),
-                   (consumer.getCount() == testSize ));
-        assertFalse("Delivery of messages to consumer was halted during this 
test", consumer.deliveryHalted());
-
-
-    }
-
-    /**
-     * Test to check if a single consumer with a JMS selector will receive all 
intended messages
-     *
-     * @throws java.lang.Exception
-     */
-    public void testDiscriminatingConsumer() throws Exception {
-
-        consumerConnection = createConnection();
-        consumerConnection.start();
-        LOG.info("consumerConnection = " +consumerConnection);
-
-        try {Thread.sleep(1000); } catch (Exception e) {}
-
-        // here we pass the JMS selector we intend to consume
-        Consumer consumer = new Consumer(consumerConnection, JMSTYPE_EATME);
-        Thread consumerThread = new Thread(consumer);
-
-        consumerThread.start();
-
-        producerConnection = createConnection();
-        producerConnection.start();
-        LOG.info("producerConnection = " +producerConnection);
-
-        try {Thread.sleep(3000); } catch (Exception e) {}
-
-        Producer producer = new Producer(producerConnection);
-        Thread producerThread = new Thread(producer);
-        producerThread.start();
-
-        // now that everything is running, let's wait for the consumer thread 
to finish ...
-        consumerThread.join();
-        producer.stop = true;
-
-        if (consumer.getCount() == (testSize / 2))
-        {
-            LOG.info("test complete .... all messsages consumed!!");
-        }
-        else
-            {
-                LOG.info("test failed .... Sent " + testSize
-                         + " original messages, only half of which (" + 
(testSize / 2) +
-                         ") were intended to be consumed: consumer paused at: 
" + consumer.getCount());
-                //System.out.println("test failed .... Sent " + testSize
-                //+ " original messages, only half of which (" + (testSize / 
2) +
-                // ") were intended to be consumed: consumer paused at: " + 
consumer.getCount());
-            }
-        assertTrue("Sent " + testSize  + " original messages, only half of 
which (" + (testSize / 2) +
-                   ") were intended to be consumed: consumer paused at: " + 
consumer.getCount(),
-                   (consumer.getCount() == (testSize / 2)));
-        assertTrue("Delivery of messages to consumer was halted during this 
test as it only wants half",
-                   consumer.deliveryHalted());
-    }
-
-    /**
-     * Helper class that will publish 2 * testSize messages.  The messages 
will be distributed evenly
-     * between the following two JMS types:
-     *
-     * @see JMSTYPE_INTENDED_FOR_CONSUMPTION
-     * @see JMSTYPE_NOT_INTENDED_FOR_CONSUMPTION
-     *
-     */
-    private class Producer extends Thread
-    {
-        private int counterSent = 0;
-        private Connection connection = null;
-        public boolean stop = false;
-
-        public Producer(Connection connection)
-        {
-            this.connection = connection;
-        }
-
-        public void run() {
-            try {
-                final Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-                final Topic queue = session.createTopic("test");
-
-                // wait for 10 seconds to allow consumer.receive to be run
-                // first
-                Thread.sleep(10000);
-                MessageProducer producer = session.createProducer(queue);
-
-                while (!stop && (counterSent < testSize))
-                    {
-                        // first send a message intended to be consumed ....
-                        TextMessage message = session.createTextMessage("*** 
Ill ....... Ini ***");  // alma mater ...
-                        message.setJMSType(JMSTYPE_EATME);
-                        //LOG.info("sending .... JMSType = " + 
message.getJMSType());
-                        
producer.send(message,DeliveryMode.NON_PERSISTENT,0,1800000);
-
-                        counterSent++;
-
-                        // now send a message intended to be consumed by some 
other consumer in the the future
-                        // ... we expect these messages to accrue in the queue
-                        message = session.createTextMessage("*** Ill ....... 
Ini ***");  // alma mater ...
-                        message.setJMSType(JMSTYPE_IGNOREME);
-                        //LOG.info("sending .... JMSType = " + 
message.getJMSType());
-                        
producer.send(message,DeliveryMode.NON_PERSISTENT,0,1800000);
-
-                        counterSent++;
-                    }
-
-                session.close();
-
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-            LOG.info("producer thread complete ... " + counterSent + " 
messages sent to the queue");
-        }
-
-        public int getCount()
-        {
-            return this.counterSent;
-        }
-    }
-
-    /**
-     * Helper class that will consume messages from the queue based on the 
supplied JMS selector.
-     * Thread will stop after the first receive(..) timeout, or once all 
expected messages have
-     * been received (see testSize).  If the thread stops due to a timeout, it 
is experiencing the
-     * delivery pause that is symptomatic of a bug in the broker.
-     */
-    private class Consumer extends Thread
-    {
-        protected int counterReceived = 0;
-        private Connection connection = null;
-        private String jmsSelector = null;
-        private boolean deliveryHalted = false;
-
-        public Consumer(Connection connection, String jmsSelector)
-        {
-            this.connection = connection;
-            this.jmsSelector = jmsSelector;
-        }
-
-        public void run() {
-            boolean testComplete = false;
-            try {
-                Session session = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-                final Topic queue = session.createTopic("test");
-                MessageConsumer consumer = null;
-                if (null != this.jmsSelector)
-                    {
-                        consumer = session.createConsumer(queue, "JMSType='" + 
this.jmsSelector + "'");
-                    }
-                else
-                    {
-                        consumer = session.createConsumer(queue);
-                    }
-
-                while (!deliveryHalted && (counterReceived < testSize))
-                    {
-                        TextMessage result = (TextMessage) 
consumer.receive(30000);
-                        if (result != null) {
-                            counterReceived++;
-                            //System.out.println("consuming .... JMSType = " + 
result.getJMSType()
-                            // + " received = " + counterReceived);
-                            LOG.info("consuming .... JMSType = " + 
result.getJMSType()
-                                     + " received = " + counterReceived);
-                        } else
-                            {
-                                LOG.info("consuming .... timeout while waiting 
for a message ."
-                                         + ".. broker must have stopped 
delivery ...  received = " + counterReceived);
-                                deliveryHalted = true;
-                            }
-                    }
-                session.close();
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-
-        }
-
-        public int getCount()
-        {
-            return this.counterReceived;
-        }
-
-        public boolean deliveryHalted()
-        {
-            return this.deliveryHalted;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java
 
b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java
deleted file mode 100644
index fa79d69..0000000
--- 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Topic;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.SessionImpl;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import junit.framework.TestCase;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DispatchMultipleConsumersTest extends JmsTestBase {
-    private final static Logger logger = 
LoggerFactory.getLogger(DispatchMultipleConsumersTest.class);
-    Destination dest;
-    String destinationName = "TEST.Q";
-    String msgStr = "Test text message";
-    int messagesPerThread = 20;
-    int producerThreads = 50;
-    int consumerCount = 2;
-    AtomicInteger sentCount;
-    AtomicInteger consumedCount;
-    CountDownLatch producerLatch;
-    CountDownLatch consumerLatch;
-    String userName = "";
-    String password = "";
-
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        dest = SessionImpl.asTopic(destinationName);
-    }
-
-    private void resetCounters() {
-        sentCount = new AtomicInteger(0);
-        consumedCount = new AtomicInteger(0);
-        producerLatch = new CountDownLatch(producerThreads);
-        consumerLatch = new CountDownLatch(consumerCount);
-    }
-
-    public void testDispatch1() {
-        for (int i = 1; i <= 5; i++) {
-            resetCounters();
-            dispatch();
-            assertEquals("Incorrect messages in Iteration " + i, 
sentCount.get() * consumerCount, consumedCount.get());
-        }
-    }
-
-    private void dispatch() {
-        startConsumers();
-        startProducers();
-        try {
-            producerLatch.await();
-            consumerLatch.await();
-        } catch (InterruptedException e) {
-            fail("test interrupted!");
-        }
-    }
-
-    private void startConsumers() {
-        HedwigConnectionFactoryImpl connFactory = new 
HedwigConnectionFactoryImpl();
-        Connection conn;
-        try {
-            conn = connFactory.createConnection(userName, password);
-            conn.start();
-            for (int i = 0; i < consumerCount; i++) {
-                ConsumerThread th = new ConsumerThread(conn, 
"ConsumerThread"+i);
-                th.start();
-            }
-        } catch (JMSException e) {
-            logger.error("Failed to start consumers", e);
-        }
-    }
-
-    private void startProducers() {
-        HedwigConnectionFactoryImpl connFactory = new 
HedwigConnectionFactoryImpl();
-        for (int i = 0; i < producerThreads; i++) {
-            Thread th = new ProducerThread(connFactory, messagesPerThread, 
"ProducerThread"+i);
-            th.start();
-        }
-    }
-
-    private class ConsumerThread extends Thread {
-        private Session session;
-        private MessageConsumer consumer;
-
-        public ConsumerThread(Connection conn, String name) {
-            super();
-            this.setName(name);
-            logger.trace("Created new consumer thread:" + name);
-            try {
-                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                consumer = session.createConsumer(dest);
-            } catch (JMSException e) {
-                logger.error("Failed to start consumer thread:" + name, e);
-            }
-        }
-
-        @Override
-        public void run() {
-            int msgCount = 0;
-            int nullCount = 0;
-            while (true) {
-                try {
-                    Message msg = consumer.receive(200);
-                    if (msg == null) {
-                        if (producerLatch.getCount() > 0) {
-                            continue;
-                        }
-                        nullCount++;
-                        if (nullCount > 10) {
-                            //assume that we are not getting any more messages
-                            break;
-                        } else {
-                            continue;
-                        }
-                    } else {
-                        nullCount = 0;
-                    }
-                    // Thread.sleep(100);
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("Message received:" + 
msg.getJMSMessageID());
-                    }
-                    msgCount++;
-                } catch (JMSException e) {
-                    logger.error("Failed to consume:", e);
-                    /*
-                } catch (InterruptedException e) {
-                    logger.error("Interrupted!", e);
-                    */
-                }
-            }
-            try {
-                consumer.close();
-            } catch (JMSException e) {
-                logger.error("Failed to close consumer " + getName(), e);
-            }
-            consumedCount.addAndGet(msgCount);
-            consumerLatch.countDown();
-            logger.trace("Consumed " + msgCount + " messages using thread " + 
getName());
-        }
-    }
-
-    private class ProducerThread extends Thread {
-        private int count;
-        private Connection conn;
-        private Session session;
-        private MessageProducer producer;
-
-        public ProducerThread(HedwigConnectionFactoryImpl connFactory, int 
count, String name) {
-            super();
-            this.count = count;
-            this.setName(name);
-            logger.trace("Created new producer thread:" + name);
-            try {
-                conn = connFactory.createConnection();
-                conn.start();
-                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                producer = session.createProducer(dest);
-            } catch (JMSException e) {
-                logger.error("Failed to start producer thread:" + name, e);
-            }
-        }
-
-        @Override
-        public void run() {
-            int i = 0;
-            try {
-                for (; i < count; i++) {
-                    producer.send(session.createTextMessage(msgStr));
-                    // Thread.sleep(500);
-                }
-                conn.close();
-            } catch (JMSException e) {
-                logger.error(e.getMessage(), e);
-                /*
-            } catch (InterruptedException e) {
-                logger.error("Interrupted!", e);
-                */
-            }
-            sentCount.addAndGet(i);
-            producerLatch.countDown();
-            if (logger.isTraceEnabled()) {
-                logger.trace("Sent " + i + " messages from thread " + 
getName());
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
 
b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
deleted file mode 100644
index c4fa74d..0000000
--- 
a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.activemq.test.TestSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DurableConsumerCloseAndReconnectTest extends TestSupport {
-    protected static final long RECEIVE_TIMEOUT = 5000L;
-    private static final Logger LOG = 
LoggerFactory.getLogger(DurableConsumerCloseAndReconnectTest.class);
-
-    protected Connection connection;
-    private Session session;
-    private MessageConsumer consumer;
-    private MessageProducer producer;
-    private Destination destination;
-    private int messageCount;
-
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        deleteAllMessages();
-    }
-
-    @Override
-    protected void tearDown() throws Exception {
-        super.tearDown();
-        deleteAllMessages();
-    }
-
-    private void deleteAllMessages() throws Exception {
-        HedwigConnectionFactoryImpl fac = new HedwigConnectionFactoryImpl();
-        Connection dummyConnection = fac.createConnection();
-        dummyConnection.start();
-        dummyConnection.close();
-    }
-
-    protected HedwigConnectionFactoryImpl createConnectionFactory() throws 
Exception {
-        return new HedwigConnectionFactoryImpl();
-    }
-
-    public void testCreateDurableConsumerCloseThenReconnect() throws Exception 
{
-        // force the server to stay up across both connection tests
-        Connection dummyConnection = createConnection();
-        dummyConnection.start();
-
-        consumeMessagesDeliveredWhileConsumerClosed();
-
-        dummyConnection.close();
-
-        // now lets try again without one connection open
-        consumeMessagesDeliveredWhileConsumerClosed();
-    }
-
-    protected void consumeMessagesDeliveredWhileConsumerClosed() throws 
Exception {
-        makeConsumer();
-        closeConsumer();
-
-        publish();
-
-        // wait a few moments for the close to really occur
-        Thread.sleep(1000);
-
-        makeConsumer();
-
-        Message message = consumer.receive(RECEIVE_TIMEOUT);
-        assertTrue("Should have received a message!", message != null);
-
-        closeConsumer();
-
-        LOG.info("Now lets create the consumer again and because we didn't 
ack, we should get it again");
-        makeConsumer();
-
-        message = consumer.receive(RECEIVE_TIMEOUT);
-        assertTrue("Should have received a message!", message != null);
-        message.acknowledge();
-
-        closeConsumer();
-
-        LOG.info("Now lets create the consumer again and because we did ack, 
we should not get it again");
-        makeConsumer();
-
-        message = consumer.receive(2000);
-        assertTrue("Should have no more messages left!", message == null);
-
-        closeConsumer();
-
-        LOG.info("Lets publish one more message now");
-        publish();
-
-        makeConsumer();
-        message = consumer.receive(RECEIVE_TIMEOUT);
-        assertTrue("Should have received a message!", message != null);
-        message.acknowledge();
-
-        closeConsumer();
-    }
-
-    protected void publish() throws Exception {
-        connection = createConnection();
-        connection.start();
-
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        destination = createDestination();
-
-        producer = session.createProducer(destination);
-        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-        TextMessage msg = session.createTextMessage("This is a test: " + 
messageCount++);
-        producer.send(msg);
-
-        producer.close();
-        producer = null;
-        closeSession();
-    }
-
-    protected Destination createDestination() throws JMSException {
-        if (isTopic()) {
-            return session.createTopic(getSubject());
-        } else {
-            return session.createTopic(getSubject());
-        }
-    }
-
-    protected boolean isTopic() {
-        return true;
-    }
-
-    protected void closeConsumer() throws JMSException {
-        consumer.close();
-        consumer = null;
-        closeSession();
-    }
-
-    protected void closeSession() throws JMSException {
-        session.close();
-        session = null;
-        connection.close();
-        connection = null;
-    }
-
-    protected void makeConsumer() throws Exception {
-        String durableName = getName();
-        String clientID = getSubject();
-        LOG.info("Creating a durable subscribe for clientID: " + clientID + " 
and durable name: " + durableName);
-        createSession(clientID);
-        consumer = createConsumer(durableName);
-    }
-
-    private MessageConsumer createConsumer(String durableName) throws 
JMSException {
-        if (destination instanceof Topic) {
-            return session.createDurableSubscriber((Topic)destination, 
durableName);
-        } else {
-            return session.createConsumer(destination);
-        }
-    }
-
-    protected void createSession(String clientID) throws Exception {
-        connection = createConnection();
-        connection.setClientID(clientID);
-        connection.start();
-
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        destination = createDestination();
-    }
-}

Reply via email to