Author: orudyy
Date: Mon Nov 23 20:16:59 2015
New Revision: 1715940
URL: http://svn.apache.org/viewvc?rev=1715940&view=rev
Log:
QPID-6869: Address review comments from Lorenz Quack
Modified:
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
Modified:
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java?rev=1715940&r1=1715939&r2=1715940&view=diff
==============================================================================
---
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
(original)
+++
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
Mon Nov 23 20:16:59 2015
@@ -48,13 +48,12 @@ public class TCPTunneler
private final TCPWorker _tcpWorker;
private final ExecutorService _executor;
- public TCPTunneler(final String targetHost,
- final int targetPort,
- final int proxyPort,
+ public TCPTunneler(final int localHost, final String remotetHost,
+ final int remotePort,
final int numberOfConcurrentClients)
{
_executor = Executors.newFixedThreadPool(numberOfConcurrentClients * 2
+ 1);
- _tcpWorker = new TCPWorker(proxyPort, targetHost, targetPort,
_executor);
+ _tcpWorker = new TCPWorker(localHost, remotetHost, remotePort,
_executor);
}
public void start() throws IOException
@@ -93,32 +92,19 @@ public class TCPTunneler
}
}
- interface TunnelListener
+ public interface TunnelListener
{
void clientConnected(InetSocketAddress clientAddress);
void clientDisconnected(InetSocketAddress clientAddress);
}
- public static class NoopTunnelListener implements TunnelListener
- {
- @Override
- public void clientConnected(final InetSocketAddress clientAddress)
- {
- }
-
- @Override
- public void clientDisconnected(final InetSocketAddress clientAddress)
- {
- }
- }
-
public static class TCPWorker implements Runnable
{
- private final String _targetHost;
- private final int _targetPort;
+ private final String _remoteHost;
+ private final int _remotePort;
private final int _localPort;
- private final String _hostPort;
+ private final String _remoteHostPort;
private final AtomicBoolean _closed;
private final Collection<SocketTunnel> _tunnels;
private final Collection<TunnelListener> _tunnelListeners;
@@ -127,19 +113,19 @@ public class TCPTunneler
private volatile ExecutorService _executor;
public TCPWorker(final int localPort,
- final String targetHost,
- final int targetPort,
+ final String remoteHost,
+ final int remotePort,
final ExecutorService executor)
{
_closed = new AtomicBoolean();
- _targetHost = targetHost;
- _targetPort = targetPort;
+ _remoteHost = remoteHost;
+ _remotePort = remotePort;
_localPort = localPort;
- _hostPort = _targetHost + ":" + _targetPort;
+ _remoteHostPort = _remoteHost + ":" + _remotePort;
_executor = executor;
_tunnels = new CopyOnWriteArrayList<>();
_tunnelListeners = new CopyOnWriteArrayList<>();
- _notifyingListener = new NoopTunnelListener()
+ _notifyingListener = new TunnelListener()
{
@Override
public void clientConnected(final InetSocketAddress
clientAddress)
@@ -171,10 +157,10 @@ public class TCPTunneler
Thread.currentThread().setName("TCPTunnelerAcceptingThread");
while (!_closed.get())
{
- Socket clientSocket = _serverSocket.accept();
- LOGGER.debug("Client opened socket {}", clientSocket);
+ Socket acceptedSocket = _serverSocket.accept();
+ LOGGER.debug("Client opened socket {}", acceptedSocket);
- createTunnel(clientSocket);
+ createTunnel(acceptedSocket);
}
}
catch (IOException e)
@@ -194,7 +180,7 @@ public class TCPTunneler
public void start()
{
- LOGGER.info("Starting TCPTunneler forwarding from port {} to {}",
_localPort, _hostPort);
+ LOGGER.info("Starting TCPTunneler forwarding from port {} to {}",
_localPort, _remoteHostPort);
try
{
_serverSocket = new ServerSocket(_localPort);
@@ -233,7 +219,7 @@ public class TCPTunneler
{
LOGGER.info("Stopping TCPTunneler forwarding from port {} to
{}",
_localPort,
- _hostPort);
+ _remoteHostPort);
try
{
for (SocketTunnel tunnel : _tunnels)
@@ -248,7 +234,7 @@ public class TCPTunneler
LOGGER.info("TCPTunneler forwarding from port {} to {} is
stopped",
_localPort,
- _hostPort);
+ _remoteHostPort);
}
}
@@ -288,24 +274,24 @@ public class TCPTunneler
}
- private void createTunnel(final Socket clientSocket)
+ private void createTunnel(final Socket localSocket)
{
- Socket serverSocket = null;
+ Socket remoteSocket = null;
try
{
- LOGGER.debug("Opening socket to {} for {}", _hostPort,
clientSocket);
- serverSocket = new Socket(_targetHost, _targetPort);
- LOGGER.debug("Opened socket to {} for {}", serverSocket,
clientSocket);
- SocketTunnel tunnel = new SocketTunnel(clientSocket,
serverSocket, _notifyingListener);
- LOGGER.debug("Socket tunnel is created from {} to {}",
clientSocket, serverSocket);
+ LOGGER.debug("Opening socket to {} for {}", _remoteHostPort,
localSocket);
+ remoteSocket = new Socket(_remoteHost, _remotePort);
+ LOGGER.debug("Opened socket to {} for {}", remoteSocket,
localSocket);
+ SocketTunnel tunnel = new SocketTunnel(localSocket,
remoteSocket, _notifyingListener);
+ LOGGER.debug("Socket tunnel is created from {} to {}",
localSocket, remoteSocket);
_tunnels.add(tunnel);
tunnel.start(_executor);
}
catch (Exception e)
{
- LOGGER.error("Cannot forward i/o traffic between {} and {}",
clientSocket, _hostPort, e);
- SocketTunnel.closeSocket(clientSocket);
- SocketTunnel.closeSocket(serverSocket);
+ LOGGER.error("Cannot forward i/o traffic between {} and {}",
localSocket, _remoteHostPort, e);
+ SocketTunnel.closeSocket(localSocket);
+ SocketTunnel.closeSocket(remoteSocket);
}
}
@@ -386,8 +372,8 @@ public class TCPTunneler
private final Socket _serverSocket;
private final TunnelListener _tunnelListener;
private final AtomicBoolean _closed;
- private final ClosableStreamForwarder _inputStreamForwarder;
- private final ClosableStreamForwarder _outputStreamForwarder;
+ private final AutoClosingStreamForwarder _upStreamForwarder;
+ private final AutoClosingStreamForwarder _downStreamForwarder;
private final InetSocketAddress _clientSocketAddress;
public SocketTunnel(final Socket clientSocket,
@@ -402,8 +388,8 @@ public class TCPTunneler
_tunnelListener = tunnelListener;
_clientSocket.setKeepAlive(true);
_serverSocket.setKeepAlive(true);
- _inputStreamForwarder = new ClosableStreamForwarder(new
StreamForwarder(_clientSocket, _serverSocket));
- _outputStreamForwarder = new ClosableStreamForwarder(new
StreamForwarder(_serverSocket, _clientSocket));
+ _upStreamForwarder = new AutoClosingStreamForwarder(new
StreamForwarder(_clientSocket, _serverSocket));
+ _downStreamForwarder = new AutoClosingStreamForwarder(new
StreamForwarder(_serverSocket, _clientSocket));
}
public void close()
@@ -424,8 +410,8 @@ public class TCPTunneler
public void start(Executor executor) throws IOException
{
- executor.execute(_inputStreamForwarder);
- executor.execute(_outputStreamForwarder);
+ executor.execute(_upStreamForwarder);
+ executor.execute(_downStreamForwarder);
_tunnelListener.clientConnected(getClientAddress());
}
@@ -461,11 +447,11 @@ public class TCPTunneler
}
- private class ClosableStreamForwarder implements Runnable
+ private class AutoClosingStreamForwarder implements Runnable
{
private StreamForwarder _streamForwarder;
- public ClosableStreamForwarder(StreamForwarder streamForwarder)
+ public AutoClosingStreamForwarder(StreamForwarder streamForwarder)
{
_streamForwarder = streamForwarder;
}
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java?rev=1715940&r1=1715939&r2=1715940&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
Mon Nov 23 20:16:59 2015
@@ -72,7 +72,7 @@ public class AbruptClientDisconnectTest
// create queue
consumeIgnoringLastSeenOmission(_utilityConnection, _testQueue, 1, 0,
-1);
- _tcpTunneler = new TCPTunneler("localhost", getPort(),
getFailingPort(), 1);
+ _tcpTunneler = new TCPTunneler(getFailingPort(), "localhost",
getPort(), 1);
_tcpTunneler.start();
}
@@ -112,7 +112,7 @@ public class AbruptClientDisconnectTest
final ClientMonitor clientMonitor = new ClientMonitor();
_tunneledConnection = createTunneledConnection(clientMonitor);
Producer producer =
- new Producer(_tunneledConnection, _testQueue,
Session.SESSION_TRANSACTED, 0, 10, new Runnable()
+ new Producer(_tunneledConnection, _testQueue,
Session.SESSION_TRANSACTED, 10, new Runnable()
{
@Override
public void run()
@@ -142,7 +142,6 @@ public class AbruptClientDisconnectTest
final Producer producer = new Producer(_utilityConnection,
_testQueue,
Session.SESSION_TRANSACTED,
- 0,
minimumNumberOfMessagesToProduce,
new Runnable()
{
@@ -258,7 +257,7 @@ public class AbruptClientDisconnectTest
session.close();
}
- private class ClientMonitor extends TCPTunneler.NoopTunnelListener
+ private class ClientMonitor implements TCPTunneler.TunnelListener
{
private final CountDownLatch _closeLatch = new CountDownLatch(1);
private final AtomicReference<InetSocketAddress> _clientAddress = new
AtomicReference();
@@ -295,13 +294,12 @@ public class AbruptClientDisconnectTest
private final Session _session;
private final MessageProducer _messageProducer;
private final int _numberOfMessagesToInvokeRunnableAfter;
- private final int _delay;
private volatile int _publishedMessageCounter;
private volatile Exception _exception;
private volatile Thread _thread;
private AtomicBoolean _closed = new AtomicBoolean();
- public Producer(Connection connection, Destination queue, int
acknowledgeMode, int publishDelay,
+ public Producer(Connection connection, Destination queue, int
acknowledgeMode,
int numberOfMessagesToInvokeRunnableAfter, Runnable
runnableToInvoke)
throws JMSException
{
@@ -309,7 +307,6 @@ public class AbruptClientDisconnectTest
_messageProducer = _session.createProducer(queue);
_runnable = runnableToInvoke;
_numberOfMessagesToInvokeRunnableAfter =
numberOfMessagesToInvokeRunnableAfter;
- _delay = publishDelay;
}
@Override
@@ -334,14 +331,6 @@ public class AbruptClientDisconnectTest
}
LOGGER.debug("Produced message with index {}",
_publishedMessageCounter);
_publishedMessageCounter++;
-
- if (_delay > 0 && !_closed.get())
- {
- synchronized (this)
- {
- this.wait(_delay);
- }
- }
}
LOGGER.debug("Stopping producer gracefully");
}
@@ -356,11 +345,6 @@ public class AbruptClientDisconnectTest
{
if (_closed.compareAndSet(false, true))
{
- synchronized (this)
- {
- this.notify();
- }
-
if (_thread != null)
{
try
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]