Author: orudyy
Date: Mon Nov 23 20:16:59 2015
New Revision: 1715940

URL: http://svn.apache.org/viewvc?rev=1715940&view=rev
Log:
QPID-6869: Address review comments from Lorenz Quack

Modified:
    
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java

Modified: 
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java?rev=1715940&r1=1715939&r2=1715940&view=diff
==============================================================================
--- 
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
 (original)
+++ 
qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
 Mon Nov 23 20:16:59 2015
@@ -48,13 +48,12 @@ public class TCPTunneler
     private final TCPWorker _tcpWorker;
     private final ExecutorService _executor;
 
-    public TCPTunneler(final String targetHost,
-                       final int targetPort,
-                       final int proxyPort,
+    public TCPTunneler(final int localHost, final String remotetHost,
+                       final int remotePort,
                        final int numberOfConcurrentClients)
     {
         _executor = Executors.newFixedThreadPool(numberOfConcurrentClients * 2 
+ 1);
-        _tcpWorker = new TCPWorker(proxyPort, targetHost, targetPort, 
_executor);
+        _tcpWorker = new TCPWorker(localHost, remotetHost, remotePort, 
_executor);
     }
 
     public void start() throws IOException
@@ -93,32 +92,19 @@ public class TCPTunneler
         }
     }
 
-    interface TunnelListener
+    public interface TunnelListener
     {
         void clientConnected(InetSocketAddress clientAddress);
 
         void clientDisconnected(InetSocketAddress clientAddress);
     }
 
-    public static class NoopTunnelListener implements TunnelListener
-    {
-        @Override
-        public void clientConnected(final InetSocketAddress clientAddress)
-        {
-        }
-
-        @Override
-        public void clientDisconnected(final InetSocketAddress clientAddress)
-        {
-        }
-    }
-
     public static class TCPWorker implements Runnable
     {
-        private final String _targetHost;
-        private final int _targetPort;
+        private final String _remoteHost;
+        private final int _remotePort;
         private final int _localPort;
-        private final String _hostPort;
+        private final String _remoteHostPort;
         private final AtomicBoolean _closed;
         private final Collection<SocketTunnel> _tunnels;
         private final Collection<TunnelListener> _tunnelListeners;
@@ -127,19 +113,19 @@ public class TCPTunneler
         private volatile ExecutorService _executor;
 
         public TCPWorker(final int localPort,
-                         final String targetHost,
-                         final int targetPort,
+                         final String remoteHost,
+                         final int remotePort,
                          final ExecutorService executor)
         {
             _closed = new AtomicBoolean();
-            _targetHost = targetHost;
-            _targetPort = targetPort;
+            _remoteHost = remoteHost;
+            _remotePort = remotePort;
             _localPort = localPort;
-            _hostPort = _targetHost + ":" + _targetPort;
+            _remoteHostPort = _remoteHost + ":" + _remotePort;
             _executor = executor;
             _tunnels = new CopyOnWriteArrayList<>();
             _tunnelListeners = new CopyOnWriteArrayList<>();
-            _notifyingListener = new NoopTunnelListener()
+            _notifyingListener = new TunnelListener()
             {
                 @Override
                 public void clientConnected(final InetSocketAddress 
clientAddress)
@@ -171,10 +157,10 @@ public class TCPTunneler
                 Thread.currentThread().setName("TCPTunnelerAcceptingThread");
                 while (!_closed.get())
                 {
-                    Socket clientSocket = _serverSocket.accept();
-                    LOGGER.debug("Client opened socket {}", clientSocket);
+                    Socket acceptedSocket = _serverSocket.accept();
+                    LOGGER.debug("Client opened socket {}", acceptedSocket);
 
-                    createTunnel(clientSocket);
+                    createTunnel(acceptedSocket);
                 }
             }
             catch (IOException e)
@@ -194,7 +180,7 @@ public class TCPTunneler
 
         public void start()
         {
-            LOGGER.info("Starting TCPTunneler forwarding from port {} to {}", 
_localPort, _hostPort);
+            LOGGER.info("Starting TCPTunneler forwarding from port {} to {}", 
_localPort, _remoteHostPort);
             try
             {
                 _serverSocket = new ServerSocket(_localPort);
@@ -233,7 +219,7 @@ public class TCPTunneler
             {
                 LOGGER.info("Stopping TCPTunneler forwarding from port {} to 
{}",
                             _localPort,
-                            _hostPort);
+                            _remoteHostPort);
                 try
                 {
                     for (SocketTunnel tunnel : _tunnels)
@@ -248,7 +234,7 @@ public class TCPTunneler
 
                 LOGGER.info("TCPTunneler forwarding from port {} to {} is 
stopped",
                             _localPort,
-                            _hostPort);
+                            _remoteHostPort);
             }
         }
 
@@ -288,24 +274,24 @@ public class TCPTunneler
         }
 
 
-        private void createTunnel(final Socket clientSocket)
+        private void createTunnel(final Socket localSocket)
         {
-            Socket serverSocket = null;
+            Socket remoteSocket = null;
             try
             {
-                LOGGER.debug("Opening socket to {} for {}", _hostPort, 
clientSocket);
-                serverSocket = new Socket(_targetHost, _targetPort);
-                LOGGER.debug("Opened socket to {} for {}", serverSocket, 
clientSocket);
-                SocketTunnel tunnel = new SocketTunnel(clientSocket, 
serverSocket, _notifyingListener);
-                LOGGER.debug("Socket tunnel is created from {} to {}", 
clientSocket, serverSocket);
+                LOGGER.debug("Opening socket to {} for {}", _remoteHostPort, 
localSocket);
+                remoteSocket = new Socket(_remoteHost, _remotePort);
+                LOGGER.debug("Opened socket to {} for {}", remoteSocket, 
localSocket);
+                SocketTunnel tunnel = new SocketTunnel(localSocket, 
remoteSocket, _notifyingListener);
+                LOGGER.debug("Socket tunnel is created from {} to {}", 
localSocket, remoteSocket);
                 _tunnels.add(tunnel);
                 tunnel.start(_executor);
             }
             catch (Exception e)
             {
-                LOGGER.error("Cannot forward i/o traffic between {} and {}", 
clientSocket, _hostPort, e);
-                SocketTunnel.closeSocket(clientSocket);
-                SocketTunnel.closeSocket(serverSocket);
+                LOGGER.error("Cannot forward i/o traffic between {} and {}", 
localSocket, _remoteHostPort, e);
+                SocketTunnel.closeSocket(localSocket);
+                SocketTunnel.closeSocket(remoteSocket);
             }
         }
 
@@ -386,8 +372,8 @@ public class TCPTunneler
         private final Socket _serverSocket;
         private final TunnelListener _tunnelListener;
         private final AtomicBoolean _closed;
-        private final ClosableStreamForwarder _inputStreamForwarder;
-        private final ClosableStreamForwarder _outputStreamForwarder;
+        private final AutoClosingStreamForwarder _upStreamForwarder;
+        private final AutoClosingStreamForwarder _downStreamForwarder;
         private final InetSocketAddress _clientSocketAddress;
 
         public SocketTunnel(final Socket clientSocket,
@@ -402,8 +388,8 @@ public class TCPTunneler
             _tunnelListener = tunnelListener;
             _clientSocket.setKeepAlive(true);
             _serverSocket.setKeepAlive(true);
-            _inputStreamForwarder = new ClosableStreamForwarder(new 
StreamForwarder(_clientSocket, _serverSocket));
-            _outputStreamForwarder = new ClosableStreamForwarder(new 
StreamForwarder(_serverSocket, _clientSocket));
+            _upStreamForwarder = new AutoClosingStreamForwarder(new 
StreamForwarder(_clientSocket, _serverSocket));
+            _downStreamForwarder = new AutoClosingStreamForwarder(new 
StreamForwarder(_serverSocket, _clientSocket));
         }
 
         public void close()
@@ -424,8 +410,8 @@ public class TCPTunneler
 
         public void start(Executor executor) throws IOException
         {
-            executor.execute(_inputStreamForwarder);
-            executor.execute(_outputStreamForwarder);
+            executor.execute(_upStreamForwarder);
+            executor.execute(_downStreamForwarder);
             _tunnelListener.clientConnected(getClientAddress());
         }
 
@@ -461,11 +447,11 @@ public class TCPTunneler
         }
 
 
-        private class ClosableStreamForwarder implements Runnable
+        private class AutoClosingStreamForwarder implements Runnable
         {
             private StreamForwarder _streamForwarder;
 
-            public ClosableStreamForwarder(StreamForwarder streamForwarder)
+            public AutoClosingStreamForwarder(StreamForwarder streamForwarder)
             {
                 _streamForwarder = streamForwarder;
             }

Modified: 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java?rev=1715940&r1=1715939&r2=1715940&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
 (original)
+++ 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
 Mon Nov 23 20:16:59 2015
@@ -72,7 +72,7 @@ public class AbruptClientDisconnectTest
         // create queue
         consumeIgnoringLastSeenOmission(_utilityConnection, _testQueue, 1, 0, 
-1);
 
-        _tcpTunneler = new TCPTunneler("localhost", getPort(), 
getFailingPort(), 1);
+        _tcpTunneler = new TCPTunneler(getFailingPort(), "localhost", 
getPort(), 1);
         _tcpTunneler.start();
     }
 
@@ -112,7 +112,7 @@ public class AbruptClientDisconnectTest
         final ClientMonitor clientMonitor = new ClientMonitor();
         _tunneledConnection = createTunneledConnection(clientMonitor);
         Producer producer =
-                new Producer(_tunneledConnection, _testQueue, 
Session.SESSION_TRANSACTED, 0, 10, new Runnable()
+                new Producer(_tunneledConnection, _testQueue, 
Session.SESSION_TRANSACTED, 10, new Runnable()
                 {
                     @Override
                     public void run()
@@ -142,7 +142,6 @@ public class AbruptClientDisconnectTest
         final Producer producer = new Producer(_utilityConnection,
                                                _testQueue,
                                                Session.SESSION_TRANSACTED,
-                                               0,
                                                
minimumNumberOfMessagesToProduce,
                                                new Runnable()
                                                {
@@ -258,7 +257,7 @@ public class AbruptClientDisconnectTest
         session.close();
     }
 
-    private class ClientMonitor extends TCPTunneler.NoopTunnelListener
+    private class ClientMonitor implements TCPTunneler.TunnelListener
     {
         private final CountDownLatch _closeLatch = new CountDownLatch(1);
         private final AtomicReference<InetSocketAddress> _clientAddress = new 
AtomicReference();
@@ -295,13 +294,12 @@ public class AbruptClientDisconnectTest
         private final Session _session;
         private final MessageProducer _messageProducer;
         private final int _numberOfMessagesToInvokeRunnableAfter;
-        private final int _delay;
         private volatile int _publishedMessageCounter;
         private volatile Exception _exception;
         private volatile Thread _thread;
         private AtomicBoolean _closed = new AtomicBoolean();
 
-        public Producer(Connection connection, Destination queue, int 
acknowledgeMode, int publishDelay,
+        public Producer(Connection connection, Destination queue, int 
acknowledgeMode,
                         int numberOfMessagesToInvokeRunnableAfter, Runnable 
runnableToInvoke)
                 throws JMSException
         {
@@ -309,7 +307,6 @@ public class AbruptClientDisconnectTest
             _messageProducer = _session.createProducer(queue);
             _runnable = runnableToInvoke;
             _numberOfMessagesToInvokeRunnableAfter = 
numberOfMessagesToInvokeRunnableAfter;
-            _delay = publishDelay;
         }
 
         @Override
@@ -334,14 +331,6 @@ public class AbruptClientDisconnectTest
                     }
                     LOGGER.debug("Produced message with index {}", 
_publishedMessageCounter);
                     _publishedMessageCounter++;
-
-                    if (_delay > 0 && !_closed.get())
-                    {
-                        synchronized (this)
-                        {
-                            this.wait(_delay);
-                        }
-                    }
                 }
                 LOGGER.debug("Stopping producer gracefully");
             }
@@ -356,11 +345,6 @@ public class AbruptClientDisconnectTest
         {
             if (_closed.compareAndSet(false, true))
             {
-                synchronized (this)
-                {
-                    this.notify();
-                }
-
                 if (_thread != null)
                 {
                     try



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to