Author: orudyy
Date: Thu Nov 19 15:16:18 2015
New Revision: 1715194
URL: http://svn.apache.org/viewvc?rev=1715194&view=rev
Log:
QPID-6869: Add system test that ensures messaging is reliable when the client
is disconnected abraptly
Added:
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
Added:
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java?rev=1715194&view=auto
==============================================================================
---
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
(added)
+++
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
Thu Nov 19 15:16:18 2015
@@ -0,0 +1,553 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A basic implementation of TCP traffic forwarder between ports.
+ * It is intended to use in tests.
+ */
+public class TCPTunneler
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TCPTunneler.class);
+
+ private final TCPWorker _tcpWorker;
+ private final ExecutorService _executor;
+
+ public TCPTunneler(final String targetHost,
+ final int targetPort,
+ final int proxyPort,
+ final int numberOfConcurrentClients)
+ {
+ _executor = Executors.newFixedThreadPool(numberOfConcurrentClients * 2
+ 1);
+ _tcpWorker = new TCPWorker(proxyPort, targetHost, targetPort,
_executor);
+ }
+
+ public void start() throws IOException
+ {
+ _tcpWorker.start();
+ }
+
+ public void stop()
+ {
+ try
+ {
+ _tcpWorker.stop();
+ }
+ finally
+ {
+ _executor.shutdown();
+ }
+ }
+
+ public void addClientListener(TunnelListener listener)
+ {
+ _tcpWorker.addClientListener(listener);
+ }
+
+ public void removeClientListener(TunnelListener listener)
+ {
+ _tcpWorker.removeClientListener(listener);
+ }
+
+ public void disconnect(InetSocketAddress address)
+ {
+ LOGGER.info("Disconnecting {}", address);
+ if (address != null)
+ {
+ _tcpWorker.disconnect(address);
+ }
+ }
+
+ interface TunnelListener
+ {
+ void clientConnected(InetSocketAddress clientAddress);
+
+ void clientDisconnected(InetSocketAddress clientAddress);
+ }
+
+ public static class NoopTunnelListener implements TunnelListener
+ {
+ @Override
+ public void clientConnected(final InetSocketAddress clientAddress)
+ {
+ }
+
+ @Override
+ public void clientDisconnected(final InetSocketAddress clientAddress)
+ {
+ }
+ }
+
+ public static class TCPWorker implements Runnable
+ {
+ private final String _targetHost;
+ private final int _targetPort;
+ private final int _localPort;
+ private final String _hostPort;
+ private final AtomicBoolean _closed;
+ private final Collection<SocketTunnel> _tunnels;
+ private final Collection<TunnelListener> _tunnelListeners;
+ private final TunnelListener _notifyingListener;
+ private volatile ServerSocket _serverSocket;
+ private volatile ExecutorService _executor;
+
+ public TCPWorker(final int localPort,
+ final String targetHost,
+ final int targetPort,
+ final ExecutorService executor)
+ {
+ _closed = new AtomicBoolean();
+ _targetHost = targetHost;
+ _targetPort = targetPort;
+ _localPort = localPort;
+ _hostPort = _targetHost + ":" + _targetPort;
+ _executor = executor;
+ _tunnels = new CopyOnWriteArrayList<>();
+ _tunnelListeners = new CopyOnWriteArrayList<>();
+ _notifyingListener = new NoopTunnelListener()
+ {
+ @Override
+ public void clientConnected(final InetSocketAddress
clientAddress)
+ {
+ notifyClientConnected(clientAddress);
+ }
+
+ @Override
+ public void clientDisconnected(final InetSocketAddress
clientAddress)
+ {
+ try
+ {
+ notifyClientDisconnected(clientAddress);
+ }
+ finally
+ {
+ removeTunnel(clientAddress);
+ }
+ }
+ };
+ }
+
+ @Override
+ public void run()
+ {
+ String threadName = Thread.currentThread().getName();
+ try
+ {
+ Thread.currentThread().setName("TCPTunnelerAcceptingThread");
+ while (!_closed.get())
+ {
+ Socket clientSocket = _serverSocket.accept();
+ LOGGER.debug("Client opened socket {}", clientSocket);
+
+ createTunnel(clientSocket);
+ }
+ }
+ catch (IOException e)
+ {
+ if (!_closed.get())
+ {
+ LOGGER.error("Exception in accepting thread", e);
+ }
+ }
+ finally
+ {
+ closeServerSocket();
+ _closed.set(true);
+ Thread.currentThread().setName(threadName);
+ }
+ }
+
+ public void start()
+ {
+ LOGGER.info("Starting TCPTunneler forwarding from port {} to {}",
_localPort, _hostPort);
+ try
+ {
+ _serverSocket = new ServerSocket(_localPort);
+ _serverSocket.setReuseAddress(true);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Cannot start TCPTunneler on port "
+ _localPort, e);
+ }
+
+ if (_serverSocket != null)
+ {
+ LOGGER.info("Listening on port {}", _localPort);
+ try
+ {
+ _executor.execute(this);
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ closeServerSocket();
+ }
+ finally
+ {
+ throw new RuntimeException("Cannot start acceptor
thread for TCPTunneler on port " + _localPort,
+ e);
+ }
+ }
+ }
+ }
+
+ public void stop()
+ {
+ if (_closed.compareAndSet(false, true))
+ {
+ LOGGER.info("Stopping TCPTunneler forwarding from port {} to
{}",
+ _localPort,
+ _hostPort);
+ try
+ {
+ for (SocketTunnel tunnel : _tunnels)
+ {
+ tunnel.close();
+ }
+ }
+ finally
+ {
+ closeServerSocket();
+ }
+
+ LOGGER.info("TCPTunneler forwarding from port {} to {} is
stopped",
+ _localPort,
+ _hostPort);
+ }
+ }
+
+ public void addClientListener(TunnelListener listener)
+ {
+ _tunnelListeners.add(listener);
+ for (SocketTunnel socketTunnel : _tunnels)
+ {
+ try
+ {
+ listener.clientConnected(socketTunnel.getClientAddress());
+ }
+ catch (Exception e)
+ {
+ LOGGER.warn("Exception on notifying client listener about
connected client", e);
+ }
+ }
+ }
+
+ public void removeClientListener(TunnelListener listener)
+ {
+ _tunnelListeners.remove(listener);
+ }
+
+ public void disconnect(final InetSocketAddress address)
+ {
+ SocketTunnel client = removeTunnel(address);
+ if (client != null && !client.isClosed())
+ {
+ client.close();
+ LOGGER.info("Tunnel for {} is disconnected", address);
+ }
+ else
+ {
+ LOGGER.info("Tunnel for {} not found", address);
+ }
+ }
+
+
+ private void createTunnel(final Socket clientSocket)
+ {
+ Socket serverSocket = null;
+ try
+ {
+ LOGGER.debug("Opening socket to {} for {}", _hostPort,
clientSocket);
+ serverSocket = new Socket(_targetHost, _targetPort);
+ LOGGER.debug("Opened socket to {} for {}", serverSocket,
clientSocket);
+ SocketTunnel tunnel = new SocketTunnel(clientSocket,
serverSocket, _notifyingListener);
+ LOGGER.debug("Socket tunnel is created from {} to {}",
clientSocket, serverSocket);
+ _tunnels.add(tunnel);
+ tunnel.start(_executor);
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Cannot forward i/o traffic between {} and {}",
clientSocket, _hostPort, e);
+ SocketTunnel.closeSocket(clientSocket);
+ SocketTunnel.closeSocket(serverSocket);
+ }
+ }
+
+ private void notifyClientConnected(final InetSocketAddress
clientAddress)
+ {
+ for (TunnelListener listener : _tunnelListeners)
+ {
+ try
+ {
+ listener.clientConnected(clientAddress);
+ }
+ catch (Exception e)
+ {
+ LOGGER.warn("Exception on notifying client listener about
connected client", e);
+ }
+ }
+ }
+
+
+ private void notifyClientDisconnected(final InetSocketAddress
clientAddress)
+ {
+ for (TunnelListener listener : _tunnelListeners)
+ {
+ try
+ {
+ listener.clientDisconnected(clientAddress);
+ }
+ catch (Exception e)
+ {
+ LOGGER.warn("Exception on notifying client listener about
disconnected client", e);
+ }
+ }
+ }
+
+ private void closeServerSocket()
+ {
+ if (_serverSocket != null)
+ {
+ try
+ {
+ _serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Exception on closing of accepting socket", e);
+ }
+ finally
+ {
+ _serverSocket = null;
+ }
+ }
+ }
+
+
+ private SocketTunnel removeTunnel(final InetSocketAddress
clientAddress)
+ {
+ SocketTunnel client = null;
+ for (SocketTunnel c : _tunnels)
+ {
+ if (c.isClientAddress(clientAddress))
+ {
+ client = c;
+ break;
+ }
+ }
+ if (client != null)
+ {
+ _tunnels.remove(client);
+ }
+ return client;
+ }
+
+ }
+
+ public static class SocketTunnel
+ {
+ private final Socket _clientSocket;
+ private final Socket _serverSocket;
+ private final TunnelListener _tunnelListener;
+ private final AtomicBoolean _closed;
+ private final ClosableStreamForwarder _inputStreamForwarder;
+ private final ClosableStreamForwarder _outputStreamForwarder;
+ private final InetSocketAddress _clientSocketAddress;
+
+ public SocketTunnel(final Socket clientSocket,
+ final Socket serverSocket,
+ final TunnelListener tunnelListener) throws
IOException
+ {
+ _clientSocket = clientSocket;
+ _clientSocketAddress =
+ new
InetSocketAddress(clientSocket.getInetAddress().getHostName(),
_clientSocket.getPort());
+ _serverSocket = serverSocket;
+ _closed = new AtomicBoolean();
+ _tunnelListener = tunnelListener;
+ _clientSocket.setKeepAlive(true);
+ _serverSocket.setKeepAlive(true);
+ _inputStreamForwarder = new ClosableStreamForwarder(new
StreamForwarder(_clientSocket, _serverSocket));
+ _outputStreamForwarder = new ClosableStreamForwarder(new
StreamForwarder(_serverSocket, _clientSocket));
+ }
+
+ public void close()
+ {
+ if (_closed.compareAndSet(false, true))
+ {
+ try
+ {
+ closeSocket(_serverSocket);
+ closeSocket(_clientSocket);
+ }
+ finally
+ {
+ _tunnelListener.clientDisconnected(getClientAddress());
+ }
+ }
+ }
+
+ public void start(Executor executor) throws IOException
+ {
+ executor.execute(_inputStreamForwarder);
+ executor.execute(_outputStreamForwarder);
+ _tunnelListener.clientConnected(getClientAddress());
+ }
+
+ public boolean isClosed()
+ {
+ return _closed.get();
+ }
+
+ public boolean isClientAddress(final InetSocketAddress clientAddress)
+ {
+ return getClientAddress().equals(clientAddress);
+ }
+
+ public InetSocketAddress getClientAddress()
+ {
+ return _clientSocketAddress;
+ }
+
+
+ private static void closeSocket(Socket socket)
+ {
+ if (socket != null)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Exception on closing of socket {}", socket,
e);
+ }
+ }
+ }
+
+
+ private class ClosableStreamForwarder implements Runnable
+ {
+ private StreamForwarder _streamForwarder;
+
+ public ClosableStreamForwarder(StreamForwarder streamForwarder)
+ {
+ _streamForwarder = streamForwarder;
+ }
+
+ @Override
+ public void run()
+ {
+ Thread currentThread = Thread.currentThread();
+ String originalThreadName = currentThread.getName();
+ try
+ {
+ currentThread.setName(_streamForwarder.getName());
+ _streamForwarder.run();
+ }
+ finally
+ {
+ close();
+ currentThread.setName(originalThreadName);
+ }
+ }
+ }
+ }
+
+ public static class StreamForwarder implements Runnable
+ {
+ private static final int BUFFER_SIZE = 4096;
+
+ private final InputStream _inputStream;
+ private final OutputStream _outputStream;
+ private final String _name;
+
+ public StreamForwarder(Socket input, Socket output) throws IOException
+ {
+ _inputStream = input.getInputStream();
+ _outputStream = output.getOutputStream();
+ _name = "Forwarder-" + input.getInetAddress().getHostName() + ":"
+ input.getPort() + "->"
+ + output.getInetAddress().getHostName() + ":" +
output.getPort();
+ }
+
+ @Override
+ public void run()
+ {
+ byte[] buffer = new byte[BUFFER_SIZE];
+ int bytesRead;
+ try
+ {
+ while ((bytesRead = _inputStream.read(buffer)) != -1)
+ {
+ _outputStream.write(buffer, 0, bytesRead);
+ _outputStream.flush();
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Exception on forwarding data for {}: {}", _name,
e.getMessage());
+ }
+ finally
+ {
+ try
+ {
+ _inputStream.close();
+ }
+ catch (IOException e)
+ {
+ // ignore
+ }
+
+ try
+ {
+ _outputStream.close();
+ }
+ catch (IOException e)
+ {
+ // ignore
+ }
+ }
+ }
+
+
+ public String getName()
+ {
+ return _name;
+ }
+ }
+}
Added:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java?rev=1715194&view=auto
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
(added)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
Thu Nov 19 15:16:18 2015
@@ -0,0 +1,491 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TCPTunneler;
+import org.apache.qpid.url.URLSyntaxException;
+
+public class AbruptClientDisconnectTest extends QpidBrokerTestCase
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AbruptClientDisconnectTest.class);
+ private static final String CONNECTION_URL_TEMPLATE =
+
"amqp://guest:guest@clientid/?brokerlist='localhost:%d?failover='false''";
+
+ private TCPTunneler _tcpTunneler;
+ private Connection _tunneledConnection;
+ private ExecutorService _executorService;
+ private Queue _testQueue;
+ private Connection _utilityConnection;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _executorService = Executors.newFixedThreadPool(3);
+
+ _testQueue = getTestQueue();
+ _utilityConnection = getConnection();
+ _utilityConnection.start();
+
+ // create queue
+ consumeIgnoringLastSeenOmission(_utilityConnection, _testQueue, 1, 0,
-1);
+
+ _tcpTunneler = new TCPTunneler("localhost", getPort(),
getFailingPort(), 1);
+ _tcpTunneler.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ if (_tunneledConnection != null)
+ {
+ _tunneledConnection.close();
+ }
+ }
+ finally
+ {
+ try
+ {
+ if (_tcpTunneler != null)
+ {
+ _tcpTunneler.stop();
+ }
+ }
+ finally
+ {
+ if (_executorService != null)
+ {
+ _executorService.shutdown();
+ }
+ super.tearDown();
+ }
+ }
+
+ }
+
+ public void testMessagingOnAbruptConnectivityLostWhilstPublishing() throws
Exception
+ {
+ final ClientMonitor clientMonitor = new ClientMonitor();
+ _tunneledConnection = createTunneledConnection(clientMonitor);
+ Producer producer =
+ new Producer(_tunneledConnection, _testQueue,
Session.SESSION_TRANSACTED, 0, 10, new Runnable()
+ {
+ @Override
+ public void run()
+ {
+
_tcpTunneler.disconnect(clientMonitor.getClientAddress());
+ }
+ }
+ );
+ _executorService.submit(producer);
+ boolean disconnected = clientMonitor.awaitDisconnect(10,
TimeUnit.SECONDS);
+ producer.stop();
+ assertTrue("Client disconnect did not happen", disconnected);
+ assertTrue("Unexpected number of published messages " +
producer.getNumberOfPublished(),
+ producer.getNumberOfPublished() >= 10);
+
+ consumeIgnoringLastSeenOmission(_utilityConnection, _testQueue, 0,
producer.getNumberOfPublished(), -1);
+ }
+
+
+ public void testMessagingOnAbruptConnectivityLostWhilstConsuming() throws
Exception
+ {
+ int minimumNumberOfMessagesToProduce = 40;
+ int minimumNumberOfMessagesToConsume = 20;
+
+ // produce minimum required number of messages before starting
consumption
+ final CountDownLatch queueDataWaiter = new CountDownLatch(1);
+ final Producer producer = new Producer(_utilityConnection,
+ _testQueue,
+ Session.SESSION_TRANSACTED,
+ 0,
+
minimumNumberOfMessagesToProduce,
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+
queueDataWaiter.countDown();
+ }
+ });
+
+ // create tunneled connection to consume messages
+ final ClientMonitor clientMonitor = new ClientMonitor();
+ _tunneledConnection = createTunneledConnection(clientMonitor);
+ _tunneledConnection.start();
+
+ // consumer will consume minimum number of messages before abrupt
disconnect
+ Consumer consumer = new Consumer(_tunneledConnection,
+ _testQueue,
+ Session.SESSION_TRANSACTED,
+ minimumNumberOfMessagesToConsume,
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ producer.stop();
+
_tcpTunneler.disconnect(clientMonitor.getClientAddress());
+ }
+ }
+ );
+
+ LOGGER.debug("Waiting for producer to produce {} messages before
consuming", minimumNumberOfMessagesToProduce);
+ _executorService.submit(producer);
+
+ assertTrue("Latch waiting for produced messages was not count down",
queueDataWaiter.await(10, TimeUnit.SECONDS));
+
+ LOGGER.debug("Producer sent {} messages. Starting consumption...",
producer.getNumberOfPublished());
+
+ _executorService.submit(consumer);
+
+ boolean disconnectOccurred = clientMonitor.awaitDisconnect(10,
TimeUnit.SECONDS);
+ consumer.stop();
+ producer.stop();
+
+ LOGGER.debug("Producer sent {} messages. Consumer received {}
messages",
+ producer.getNumberOfPublished(),
+ consumer.getNumberOfConsumed());
+
+ assertTrue("Client disconnect did not happen", disconnectOccurred);
+ assertTrue("Unexpected number of published messages " +
producer.getNumberOfPublished(),
+ producer.getNumberOfPublished() >=
minimumNumberOfMessagesToProduce);
+ assertTrue("Unexpected number of consumed messages " +
consumer.getNumberOfConsumed(),
+ consumer.getNumberOfConsumed() >=
minimumNumberOfMessagesToConsume);
+
+ LOGGER.debug("Remaining number to consume {}.",
+ (producer.getNumberOfPublished() -
consumer.getNumberOfConsumed()));
+ consumeIgnoringLastSeenOmission(_utilityConnection,
+ _testQueue,
+ consumer.getNumberOfConsumed(),
+ producer.getNumberOfPublished(),
+ consumer.getLastSeenMessageIndex());
+
+ }
+
+
+ private Connection createTunneledConnection(final ClientMonitor
clientMonitor)
+ throws URLSyntaxException, JMSException
+ {
+ final ConnectionURL url = new
AMQConnectionURL(String.format(CONNECTION_URL_TEMPLATE, getFailingPort()));
+ Connection tunneledConnection = getConnection(url);
+ _tcpTunneler.addClientListener(clientMonitor);
+ final AtomicReference _exception = new AtomicReference();
+ tunneledConnection.setExceptionListener(new ExceptionListener()
+ {
+ @Override
+ public void onException(final JMSException exception)
+ {
+ _exception.set(exception);
+ _tcpTunneler.disconnect(clientMonitor.getClientAddress());
+ }
+ });
+ return tunneledConnection;
+ }
+
+ private void consumeIgnoringLastSeenOmission(final Connection connection,
+ final Queue testQueue,
+ int fromIndex,
+ int toIndex,
+ int
consumerLastSeenMessageIndex)
+ throws JMSException
+ {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(testQueue);
+ int expectedIndex = fromIndex;
+ while (expectedIndex < toIndex)
+ {
+ Message message = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Expected message with index " + expectedIndex + "
but got null", message);
+ int messageIndex = message.getIntProperty(INDEX);
+ LOGGER.debug("Received message with index {}, expected index is
{}", messageIndex, expectedIndex);
+ if (messageIndex != expectedIndex
+ && expectedIndex == fromIndex
+ && messageIndex == consumerLastSeenMessageIndex + 1)
+ {
+ LOGGER.debug( "Broker transaction was completed for message {}"
+ + " but there was no network to notify client
about its completion.",
+ consumerLastSeenMessageIndex);
+ expectedIndex = messageIndex;
+ }
+ assertEquals("Unexpected message index", expectedIndex,
messageIndex);
+ expectedIndex++;
+ }
+ session.close();
+ }
+
+ private class ClientMonitor extends TCPTunneler.NoopTunnelListener
+ {
+ private final CountDownLatch _closeLatch = new CountDownLatch(1);
+ private final AtomicReference<InetSocketAddress> _clientAddress = new
AtomicReference();
+
+ @Override
+ public void clientConnected(final InetSocketAddress clientAddress)
+ {
+ _clientAddress.set(clientAddress);
+ }
+
+ @Override
+ public void clientDisconnected(final InetSocketAddress clientAddress)
+ {
+ if (clientAddress.equals(getClientAddress()))
+ {
+ _closeLatch.countDown();
+ }
+ }
+
+ public boolean awaitDisconnect(int period, TimeUnit timeUnit) throws
InterruptedException
+ {
+ return _closeLatch.await(period, timeUnit);
+ }
+
+ public InetSocketAddress getClientAddress()
+ {
+ return _clientAddress.get();
+ }
+ }
+
+ private class Producer implements Runnable
+ {
+ private final Runnable _runnable;
+ private final Session _session;
+ private final MessageProducer _messageProducer;
+ private final int _numberOfMessagesToInvokeRunnableAfter;
+ private final int _delay;
+ private volatile int _publishedMessageCounter;
+ private volatile Exception _exception;
+ private volatile Thread _thread;
+ private AtomicBoolean _closed = new AtomicBoolean();
+
+ public Producer(Connection connection, Destination queue, int
acknowledgeMode, int publishDelay,
+ int numberOfMessagesToInvokeRunnableAfter, Runnable
runnableToInvoke)
+ throws JMSException
+ {
+ _session = connection.createSession(acknowledgeMode ==
Session.SESSION_TRANSACTED, acknowledgeMode);
+ _messageProducer = _session.createProducer(queue);
+ _runnable = runnableToInvoke;
+ _numberOfMessagesToInvokeRunnableAfter =
numberOfMessagesToInvokeRunnableAfter;
+ _delay = publishDelay;
+ }
+
+ @Override
+ public void run()
+ {
+ _thread = Thread.currentThread();
+ try
+ {
+ Message message = _session.createMessage();
+ while (!_closed.get())
+ {
+ if (_publishedMessageCounter ==
_numberOfMessagesToInvokeRunnableAfter && _runnable != null)
+ {
+ _executorService.execute(_runnable);
+ }
+
+ message.setIntProperty(INDEX, _publishedMessageCounter);
+ _messageProducer.send(message);
+ if (_session.getTransacted())
+ {
+ _session.commit();
+ }
+ LOGGER.debug("Produced message with index {}",
_publishedMessageCounter);
+ _publishedMessageCounter++;
+
+ if (_delay > 0 && !_closed.get())
+ {
+ synchronized (this)
+ {
+ this.wait(_delay);
+ }
+ }
+ }
+ LOGGER.debug("Stopping producer gracefully");
+ }
+ catch (Exception e)
+ {
+ LOGGER.debug("Stopping producer due to exception", e);
+ _exception = e;
+ }
+ }
+
+ public void stop()
+ {
+ if (_closed.compareAndSet(false, true))
+ {
+ synchronized (this)
+ {
+ this.notify();
+ }
+
+ if (_thread != null)
+ {
+ try
+ {
+ _thread.join(2000);
+ }
+ catch (InterruptedException e)
+ {
+ _thread.interrupt();
+ }
+ }
+ }
+ }
+
+ public int getNumberOfPublished()
+ {
+ return _publishedMessageCounter;
+ }
+
+ public Exception getException()
+ {
+ return _exception;
+ }
+
+ }
+
+ private class Consumer implements Runnable
+ {
+ private final Runnable _runnable;
+ private final Session _session;
+ private final MessageConsumer _messageConsumer;
+ private final int _numberOfMessagesToInvokeRunnableAfter;
+ private volatile int _consumedMessageCounter;
+ private volatile Exception _exception;
+ private volatile Thread _thread;
+ private AtomicBoolean _closed = new AtomicBoolean();
+ private volatile int _lastSeenMessageIndex;
+
+ public Consumer(Connection connection,
+ Destination queue,
+ int acknowledgeMode,
+ int numberOfMessagesToInvokeRunnableAfter,
+ Runnable runnableToInvoke)
+ throws JMSException
+ {
+ _session = connection.createSession(acknowledgeMode ==
Session.SESSION_TRANSACTED, acknowledgeMode);
+ _messageConsumer = _session.createConsumer(queue);
+ _runnable = runnableToInvoke;
+ _numberOfMessagesToInvokeRunnableAfter =
numberOfMessagesToInvokeRunnableAfter;
+ }
+
+ @Override
+ public void run()
+ {
+ _thread = Thread.currentThread();
+ try
+ {
+ while (!_closed.get())
+ {
+ if (_consumedMessageCounter ==
_numberOfMessagesToInvokeRunnableAfter && _runnable != null)
+ {
+ _executorService.execute(_runnable);
+ }
+
+ Message message =
_messageConsumer.receive(RECEIVE_TIMEOUT);
+ if (message != null)
+ {
+ int messageIndex = message.getIntProperty(INDEX);
+ _lastSeenMessageIndex = messageIndex;
+ LOGGER.debug("Received message with index {}, expected
index {}",
+ messageIndex,
+ _consumedMessageCounter);
+ assertEquals("Unexpected message index",
+ _consumedMessageCounter,
+ messageIndex);
+
+ if (_session.getTransacted())
+ {
+ _session.commit();
+ LOGGER.debug("Committed message with index {}",
messageIndex);
+ }
+ _consumedMessageCounter++;
+ }
+ }
+ LOGGER.debug("Stopping consumer gracefully");
+ }
+ catch (Exception e)
+ {
+ LOGGER.debug("Stopping consumer due to exception, number of
consumed {}", _consumedMessageCounter, e);
+ _exception = e;
+ }
+ }
+
+ public void stop()
+ {
+ if (_closed.compareAndSet(false, true))
+ {
+ if (_thread != null)
+ {
+ try
+ {
+ _thread.join(2000);
+ }
+ catch (InterruptedException e)
+ {
+ _thread.interrupt();
+ }
+ }
+ }
+ }
+
+ public int getNumberOfConsumed()
+ {
+ return _consumedMessageCounter;
+ }
+
+ public Exception getException()
+ {
+ return _exception;
+ }
+
+ public int getLastSeenMessageIndex()
+ {
+ return _lastSeenMessageIndex;
+ }
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]