Author: peter_firmstone Date: Fri Dec 11 07:43:40 2015 New Revision: 1719272
URL: http://svn.apache.org/viewvc?rev=1719272&view=rev Log: I was able to cause test failures in MuxStartTimoutTest occasionally on multiple test runs, identical to those on Jenkins. The test thread was found to be still waiting in Mux.start(). Mux start timeout robustness improvements, minimized sync block scope and checked conditions immediately after notify, while monitor is held. Refactored some unchecked casts while fixing JERI mux. Modified: river/jtsk/trunk/src/net/jini/jeri/connection/ConnectionManager.java river/jtsk/trunk/src/org/apache/river/action/GetLongAction.java river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/Mux.java river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java river/jtsk/trunk/src/org/apache/river/thread/GetThreadPoolAction.java river/jtsk/trunk/test/src/org/apache/river/jeri/internal/mux/MuxStartTimeoutTest.java Modified: river/jtsk/trunk/src/net/jini/jeri/connection/ConnectionManager.java URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/net/jini/jeri/connection/ConnectionManager.java?rev=1719272&r1=1719271&r2=1719272&view=diff ============================================================================== --- river/jtsk/trunk/src/net/jini/jeri/connection/ConnectionManager.java (original) +++ river/jtsk/trunk/src/net/jini/jeri/connection/ConnectionManager.java Fri Dec 11 07:43:40 2015 @@ -122,16 +122,16 @@ public final class ConnectionManager { * How long to leave idle muxes around before closing them. */ private static final long TIMEOUT - = ((Long) AccessController.doPrivileged(new GetLongAction( + = AccessController.doPrivileged(new GetLongAction( "org.apache.river.jeri.connectionTimeout", - 15000))).longValue(); + 15000)).longValue(); /** * How long to wait for a server to respond to an initial client message. */ private static final long HANDSHAKE_TIMEOUT - = ((Long) AccessController.doPrivileged(new GetLongAction( + = AccessController.doPrivileged(new GetLongAction( "org.apache.river.jeri.handshakeTimeout", - 15000))).longValue(); + 15000)).longValue(); /** * ConnectionManager logger. */ @@ -141,7 +141,7 @@ public final class ConnectionManager { * Executor that executes tasks in pooled system threads. */ private static final Executor systemThreadPool - = (Executor) AccessController.doPrivileged( + = AccessController.doPrivileged( new GetThreadPoolAction(false)); /** * Set of connection managers with open or pending muxes (connections), for @@ -252,9 +252,9 @@ public final class ConnectionManager { * removes the mux and adds it to the idle list. Returns true if no connects * are pending and no muxes remain. */ - synchronized boolean checkIdle(long now, List idle) { + synchronized boolean checkIdle(long now, List<OutboundMux> idle) { for (int i = muxes.size(); --i >= 0;) { - OutboundMux mux = (OutboundMux) muxes.get(i); + OutboundMux mux = muxes.get(i); if (mux.checkIdle(now)) { muxes.remove(i); idle.add(mux); @@ -282,7 +282,6 @@ public final class ConnectionManager { try { mux = (c.getChannel() == null) ? new OutboundMux(this, c) : new OutboundMux(this, c, true); - mux.setStartTimeout(HANDSHAKE_TIMEOUT); } finally { if (mux == null) { try { @@ -332,7 +331,7 @@ public final class ConnectionManager { * Constructs an instance from the connection's streams. */ OutboundMux(ConnectionManager manager, Connection c) throws IOException { - super(c.getOutputStream(), c.getInputStream()); + super(c.getOutputStream(), c.getInputStream(), HANDSHAKE_TIMEOUT); this.c = c; this.manager = manager; } @@ -341,7 +340,7 @@ public final class ConnectionManager { * Constructs an instance from the connection's channel. */ OutboundMux(ConnectionManager manager, Connection c, boolean ignore) throws IOException { - super(c.getChannel()); + super(c.getChannel(), HANDSHAKE_TIMEOUT); this.c = c; this.manager = manager; } Modified: river/jtsk/trunk/src/org/apache/river/action/GetLongAction.java URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/org/apache/river/action/GetLongAction.java?rev=1719272&r1=1719271&r2=1719272&view=diff ============================================================================== --- river/jtsk/trunk/src/org/apache/river/action/GetLongAction.java (original) +++ river/jtsk/trunk/src/org/apache/river/action/GetLongAction.java Fri Dec 11 07:43:40 2015 @@ -71,7 +71,7 @@ import net.jini.security.Security; * @see Security * @since 2.0 **/ -public class GetLongAction implements PrivilegedAction { +public class GetLongAction implements PrivilegedAction<Long> { private static final Logger logger = Logger.getLogger("org.apache.river.action.GetLongAction"); @@ -121,7 +121,7 @@ public class GetLongAction implements Pr * @return a <code>Long</code> representing the value of the * system property or the default value, or <code>null</code> **/ - public Object run() { + public Long run() { try { Long value = Long.getLong(theProp); if (value != null) { Modified: river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/Mux.java URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/Mux.java?rev=1719272&r1=1719271&r2=1719272&view=diff ============================================================================== --- river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/Mux.java (original) +++ river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/Mux.java Fri Dec 11 07:43:40 2015 @@ -83,7 +83,7 @@ abstract class Mux { * used for shutting down sessions when a connection goes down */ private static final Executor systemThreadPool = - (Executor) AccessController.doPrivileged( + AccessController.doPrivileged( new GetThreadPoolAction(false)); /** session shutdown tasks to be executed asynchronously */ @@ -125,11 +125,13 @@ abstract class Mux { /** lock guarding all mutable instance state (below) */ final Object muxLock = new Object(); - int initialOutboundRation; // set from remote connection header - private boolean clientConnectionReady = false; // server header received + int initialOutboundRation; // set from remote connection + // volatile reads, sync writes on muxLock + private volatile boolean clientConnectionReady = false; // server header received boolean serverConnectionReady = false; // server header sent - boolean muxDown = false; + // volatile reads, sync writes on muxLock + volatile boolean muxDown = false; String muxDownMessage; Throwable muxDownCause; @@ -138,15 +140,14 @@ abstract class Mux { private int expectedPingCookie = -1; - /** unguarded instance state */ - private volatile long startTimeout = 30000; // milliseconds + /** ONLY USED BY CLIENT */ + private final long startTimeout; // milliseconds /** * Constructs a new Mux instance for a connection accessible through * standard (blocking) I/O streams. */ - Mux(OutputStream out, InputStream in, - int role, int initialInboundRation, int maxFragmentSize) + Mux(OutputStream out, InputStream in, int role, int initialInboundRation, int maxFragmentSize, long handshakeTimeout) throws IOException { this.role = role; @@ -160,10 +161,10 @@ abstract class Mux { this.connectionIO = new StreamConnectionIO(this, out, in); directBuffersUseful = false; + startTimeout = handshakeTimeout; } - Mux(SocketChannel channel, - int role, int initialInboundRation, int maxFragmentSize) + Mux(SocketChannel channel, int role, int initialInboundRation, int maxFragmentSize, long handshakeTimeout) throws IOException { this.role = role; @@ -177,6 +178,7 @@ abstract class Mux { this.connectionIO = new SocketChannelConnectionIO(this, channel); directBuffersUseful = true; + startTimeout = handshakeTimeout; } /** @@ -191,11 +193,11 @@ abstract class Mux { * @param timeout * positive value in milliseconds */ - public void setStartTimeout(long timeout) { - if (timeout <= 0) - throw new IllegalArgumentException("start timeout must be a positive number of milliseconds"); - this.startTimeout = timeout; - } +// public void setStartTimeout(long timeout) { +// if (timeout <= 0) +// throw new IllegalArgumentException("start timeout must be a positive number of milliseconds"); +// this.startTimeout = timeout; +// } /** * Starts I/O processing. @@ -221,25 +223,26 @@ abstract class Mux { if (role == CLIENT) { asyncSendClientConnectionHeader(); - synchronized (muxLock) { - long now = System.currentTimeMillis(); - long endTime = now + this.startTimeout; - while (!muxDown && !clientConnectionReady) { - if (now > endTime) { - setDown("timeout waiting for server to respond to handshake", null); - } else { - try { - muxLock.wait(endTime - now); - now = System.currentTimeMillis(); - } catch (InterruptedException e) { - setDown("interrupt waiting for connection header", e); - } + long now = System.currentTimeMillis(); + long endTime = now + this.startTimeout; + while (!muxDown && !clientConnectionReady) { + try { + synchronized (muxLock){ + muxLock.wait(endTime - now); + if (clientConnectionReady) return; + if (muxDown) throw new IOException(muxDownMessage, muxDownCause); } - } - if (muxDown) { - throw new IOException(muxDownMessage, muxDownCause); - } - } + now = System.currentTimeMillis(); + if (now < endTime) continue; + String message = "timeout waiting for server to respond to handshake"; + setDown(message, null); + throw new IOException(message, null); + } catch (InterruptedException e) { + String message = "interrupt waiting for connection header"; + setDown(message, e); + throw new IOException(message, e); + } + } } } @@ -294,8 +297,8 @@ abstract class Mux { */ final void setDown(final String message, final Throwable cause) { SessionShutdownTask sst = null; + if (muxDown) return; synchronized (muxLock) { - if (muxDown) return; muxDown = true; muxDownMessage = message; muxDownCause = cause; Modified: river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java?rev=1719272&r1=1719271&r2=1719272&view=diff ============================================================================== --- river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java (original) +++ river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java Fri Dec 11 07:43:40 2015 @@ -47,13 +47,14 @@ public class MuxClient extends Mux { * @param out the output stream of the underlying connection * * @param in the input stream of the underlying connection + * @param handshakeTimeout **/ - public MuxClient(OutputStream out, InputStream in) throws IOException { - super(out, in, Mux.CLIENT, clientInitialInboundRation, 1024); + public MuxClient(OutputStream out, InputStream in, long handshakeTimeout) throws IOException { + super(out, in, Mux.CLIENT, clientInitialInboundRation, 1024, handshakeTimeout); } - public MuxClient(SocketChannel channel) throws IOException { - super(channel, Mux.CLIENT, clientInitialInboundRation, 1024); + public MuxClient(SocketChannel channel, long handshakeTimeout) throws IOException { + super(channel, Mux.CLIENT, clientInitialInboundRation, 1024, handshakeTimeout); } /** Modified: river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java?rev=1719272&r1=1719271&r2=1719272&view=diff ============================================================================== --- river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java (original) +++ river/jtsk/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java Fri Dec 11 07:43:40 2015 @@ -46,16 +46,16 @@ public class MuxServer extends Mux { /** initial inbound ration as server, default is 32768 */ private static final int serverInitialInboundRation = - ((Integer) AccessController.doPrivileged(new GetIntegerAction( + AccessController.doPrivileged(new GetIntegerAction( "org.apache.river.jeri.connection.mux.server.initialInboundRation", - 32768))).intValue(); + 32768)).intValue(); /** * pool of threads for executing tasks with user code: used for * dispatching incoming requests to request dispatchers **/ private static final Executor userThreadPool = - (Executor) AccessController.doPrivileged( + AccessController.doPrivileged( new GetThreadPoolAction(true)); /** mux logger */ @@ -83,7 +83,7 @@ public class MuxServer extends Mux { RequestDispatcher requestDispatcher) throws IOException { - super(out, in, Mux.SERVER, serverInitialInboundRation, 1024); + super(out, in, Mux.SERVER, serverInitialInboundRation, 1024, 0L); this.requestDispatcher = requestDispatcher; this.securityContext = Security.getContext(); @@ -93,7 +93,7 @@ public class MuxServer extends Mux { RequestDispatcher requestDispatcher) throws IOException { - super(channel, Mux.SERVER, serverInitialInboundRation, 1024); + super(channel, Mux.SERVER, serverInitialInboundRation, 1024, 0L); this.requestDispatcher = requestDispatcher; this.securityContext = Security.getContext(); Modified: river/jtsk/trunk/src/org/apache/river/thread/GetThreadPoolAction.java URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/org/apache/river/thread/GetThreadPoolAction.java?rev=1719272&r1=1719271&r2=1719272&view=diff ============================================================================== --- river/jtsk/trunk/src/org/apache/river/thread/GetThreadPoolAction.java (original) +++ river/jtsk/trunk/src/org/apache/river/thread/GetThreadPoolAction.java Fri Dec 11 07:43:40 2015 @@ -46,7 +46,7 @@ import java.security.PrivilegedAction; * * @author Sun Microsystems, Inc. **/ -public final class GetThreadPoolAction implements PrivilegedAction { +public final class GetThreadPoolAction implements PrivilegedAction<Executor> { /** pool of threads for executing tasks in system thread group */ private static final ThreadPool systemThreadPool = @@ -77,7 +77,7 @@ public final class GetThreadPoolAction i this.user = user; } - public Object run() { + public Executor run() { if (user){ getUserThreadPoolPermission.checkGuard(this); return userThreadPool; Modified: river/jtsk/trunk/test/src/org/apache/river/jeri/internal/mux/MuxStartTimeoutTest.java URL: http://svn.apache.org/viewvc/river/jtsk/trunk/test/src/org/apache/river/jeri/internal/mux/MuxStartTimeoutTest.java?rev=1719272&r1=1719271&r2=1719272&view=diff ============================================================================== --- river/jtsk/trunk/test/src/org/apache/river/jeri/internal/mux/MuxStartTimeoutTest.java (original) +++ river/jtsk/trunk/test/src/org/apache/river/jeri/internal/mux/MuxStartTimeoutTest.java Fri Dec 11 07:43:40 2015 @@ -58,7 +58,8 @@ public class MuxStartTimeoutTest { final AtomicBoolean finished = new AtomicBoolean(false); final AtomicBoolean succeeded = new AtomicBoolean(false); final AtomicBoolean failed = new AtomicBoolean(false); - final MuxClient muxClient = new MuxClient(os, is); + // Set mux client timeout in constructor, independant of any system properties. + final MuxClient muxClient = new MuxClient(os, is, 15000); try { Thread t = new Thread(new Runnable() { public void run() { @@ -71,9 +72,9 @@ public class MuxStartTimeoutTest { finished.set(true); } } - }); + }, "MuxStartTimeoutTest client.start"); t.start(); - t.join(35000); + t.join(20000); // 5 seconds grace should be sufficient. assertTrue(finished.get()); assertFalse(succeeded.get()); assertTrue(failed.get());
