chirino 2004/04/23 23:29:01
Modified:
modules/network/src/java/org/apache/geronimo/network/protocol/control
ControlClientProtocolKitchen.java
ControlClientProtocol.java
ControlServerProtocol.java
modules/network/src/java/org/apache/geronimo/network/protocol
SocketProtocol.java
Log:
Trying to workout some deadlocks.
Revision Changes Path
1.4 +25 -12
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/control/ControlClientProtocolKitchen.java
Index: ControlClientProtocolKitchen.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/control/ControlClientProtocolKitchen.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- ControlClientProtocolKitchen.java 17 Mar 2004 03:11:59 -0000 1.3
+++ ControlClientProtocolKitchen.java 24 Apr 2004 06:29:01 -0000 1.4
@@ -20,8 +20,8 @@
import java.util.Collection;
import java.util.Iterator;
-import EDU.oswego.cs.dl.util.concurrent.Mutex;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.geronimo.network.SelectorManager;
import org.apache.geronimo.network.protocol.AbstractProtocol;
import org.apache.geronimo.network.protocol.DownPacket;
@@ -33,24 +33,35 @@
import org.apache.geronimo.system.ClockPool;
import org.apache.geronimo.system.ThreadPool;
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+
/**
* @version $Revision$ $Date$
*/
class ControlClientProtocolKitchen extends ProtocolStack implements
ControlClientListener {
+ final private static Log log =
LogFactory.getLog(ControlClientProtocolKitchen.class);
+
private ClassLoader classLoader;
private ThreadPool threadPool;
private ClockPool clockPool;
private SelectorManager selectorManager;
- private Mutex sendMutex = new Mutex(); //todo: replace with something
that uses no locks
-
+ private Latch sendLatch = new Latch();
ControlClientProtocolKitchen() throws InterruptedException {
push(new Dummy());
- sendMutex.acquire();
}
-
+
+ /**
+ * @see
org.apache.geronimo.network.protocol.ProtocolStack#cloneProtocol()
+ */
+ public Protocol cloneProtocol() throws CloneNotSupportedException {
+ ControlClientProtocolKitchen p = (ControlClientProtocolKitchen)
super.cloneProtocol();
+ p.sendLatch = new Latch();
+ return p;
+ }
+
public ClassLoader getClassLoader() {
return classLoader;
}
@@ -84,7 +95,7 @@
}
public void serveUp(Collection menu) throws ControlException {
- System.out.println("serveUp");
+ log.trace("serveUp");
ControlContext context = new ControlContext();
context.setClassLoader(classLoader);
@@ -105,8 +116,9 @@
} catch (ProtocolException e) {
throw new ControlException(e);
}
-
- sendMutex.release();
+
+ log.trace("RELEASING send Latch: "+sendLatch);
+ sendLatch.release();
}
public void shutdown() {
@@ -114,9 +126,10 @@
public void sendDown(DownPacket packet) throws ProtocolException {
try {
- if (!sendMutex.attempt(1000 * 1000)) throw new
ProtocolException("Send timeout.");
+ log.trace("AQUIRING send Latch: "+sendLatch);
+ if (!sendLatch.attempt(1000 * 1000)) throw new
ProtocolException("Send timeout.");
+ log.trace("AQUIRED send Latch: "+sendLatch);
super.sendDown(packet);
- sendMutex.release();
} catch (InterruptedException e) {
throw new ProtocolException(e);
}
1.6 +24 -27
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/control/ControlClientProtocol.java
Index: ControlClientProtocol.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/control/ControlClientProtocol.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- ControlClientProtocol.java 10 Apr 2004 17:14:01 -0000 1.5
+++ ControlClientProtocol.java 24 Apr 2004 06:29:01 -0000 1.6
@@ -17,15 +17,15 @@
package org.apache.geronimo.network.protocol.control;
-import EDU.oswego.cs.dl.util.concurrent.Latch;
-import EDU.oswego.cs.dl.util.concurrent.Mutex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.geronimo.network.protocol.DownPacket;
+import org.apache.geronimo.network.protocol.Protocol;
import org.apache.geronimo.network.protocol.ProtocolException;
import org.apache.geronimo.network.protocol.UpPacket;
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+
/**
* @version $Revision$ $Date$
@@ -36,7 +36,7 @@
private ControlClientListener listener;
private ClassLoader classLoader;
- private Mutex sendMutex = new Mutex(); //todo: replace with something
that uses no locks
+ private Latch sendLatch = new Latch(); //todo: replace with something
that uses no locks
private Latch shutdownLatch = new Latch();
private long timeout;
@@ -44,6 +44,16 @@
private final int STOPPED = 1;
private int state = STOPPED;
+ /**
+ * @see
org.apache.geronimo.network.protocol.AbstractProtocol#cloneProtocol()
+ */
+ public Protocol cloneProtocol() throws CloneNotSupportedException {
+ ControlClientProtocol p =
(ControlClientProtocol)super.cloneProtocol();
+ p.sendLatch = new Latch();
+ p.shutdownLatch = new Latch();
+ return p;
+ }
+
public ControlClientListener getListener() {
return listener;
}
@@ -69,19 +79,9 @@
}
public void setup() throws ProtocolException {
- try {
- log.trace("Starting");
-
- getDownProtocol().sendDown(new BootRequestDownPacket()); //todo:
this is probably dangerous, put in thread pool
-
- log.trace("AQUIRING " + sendMutex);
- sendMutex.acquire();
- log.trace("AQUIRED " + sendMutex);
-
- state = STARTED;
- } catch (InterruptedException e) {
- throw new ProtocolException(e);
- }
+ log.trace("Starting");
+ getDownProtocol().sendDown(new BootRequestDownPacket()); //todo:
this is probably dangerous, put in thread pool
+ state = STARTED;
}
public void drain() throws ProtocolException {
@@ -110,9 +110,9 @@
log.trace("BOOT RESPONSE");
listener.serveUp(((BootResponseUpPacket) p).getMenu());
getDownProtocol().sendDown(new BootSuccessDownPacket());
- log.trace("RELEASING " + sendMutex);
- sendMutex.release();
- log.trace("RELEASED " + sendMutex);
+ log.trace("RELEASING " + sendLatch);
+ sendLatch.release();
+ log.trace("RELEASED " + sendLatch);
} catch (ControlException e) {
throw new ProtocolException(e);
}
@@ -135,18 +135,15 @@
public void sendDown(DownPacket packet) throws ProtocolException {
try {
- log.trace("AQUIRING " + sendMutex);
- if (!sendMutex.attempt(timeout)) throw new
ProtocolException("Send timeout");
- log.trace("AQUIRED " + sendMutex);
+ log.trace("AQUIRING " + sendLatch);
+ if (!sendLatch.attempt(timeout)) throw new
ProtocolException("Send timeout");
+ log.trace("AQUIRED " + sendLatch);
PassthroughDownPacket passthtough = new PassthroughDownPacket();
passthtough.setBuffers(packet.getBuffers());
getDownProtocol().sendDown(passthtough);
- log.trace("RELEASING " + sendMutex);
- sendMutex.release();
- log.trace("RELEASED " + sendMutex);
} catch (InterruptedException e) {
throw new ProtocolException(e);
}
1.5 +13 -20
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/control/ControlServerProtocol.java
Index: ControlServerProtocol.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/control/ControlServerProtocol.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- ControlServerProtocol.java 10 Apr 2004 17:14:01 -0000 1.4
+++ ControlServerProtocol.java 24 Apr 2004 06:29:01 -0000 1.5
@@ -19,10 +19,8 @@
import java.util.Collection;
-import EDU.oswego.cs.dl.util.concurrent.Mutex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.geronimo.network.SelectorManager;
import org.apache.geronimo.network.protocol.DownPacket;
import org.apache.geronimo.network.protocol.ProtocolException;
@@ -30,6 +28,8 @@
import org.apache.geronimo.system.ClockPool;
import org.apache.geronimo.system.ThreadPool;
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+
/**
* @version $Revision$ $Date$
@@ -43,7 +43,7 @@
private ThreadPool threadPool;
private ClockPool clockPool;
private SelectorManager selectorManager;
- private Mutex sendMutex = new Mutex(); //todo: replace with something
that uses no locks
+ private Latch sendLatch; //todo: replace with something that uses no
locks
private long timeout;
private final int STARTED = 0;
@@ -99,13 +99,9 @@
}
public void setup() throws ProtocolException {
- try {
- log.trace("Starting");
- sendMutex.acquire();
- state = STARTED;
- } catch (InterruptedException e) {
- throw new ProtocolException(e);
- }
+ log.trace("Starting");
+ sendLatch = new Latch();
+ state = STARTED;
}
public void drain() throws ProtocolException {
@@ -129,9 +125,9 @@
getDownProtocol().sendDown(constructBootPacket());
} else if (p instanceof BootSuccessUpPacket) {
log.trace("BOOT SUCCESS");
- log.trace("RELEASING " + sendMutex);
- sendMutex.release();
- log.trace("RELEASED " + sendMutex);
+ log.trace("RELEASING " + sendLatch);
+ sendLatch.release();
+ log.trace("RELEASED " + sendLatch);
} else if (p instanceof ShutdownRequestUpPacket) {
log.trace("SHUTDOWN_REQ");
getDownProtocol().sendDown(new ShutdownAcknowledgeDownPacket());
@@ -142,18 +138,15 @@
public void sendDown(DownPacket packet) throws ProtocolException {
try {
- log.trace("AQUIRING " + sendMutex);
- if (!sendMutex.attempt(timeout)) throw new
ProtocolException("Send timeout.");
- log.trace("AQUIRED " + sendMutex);
+ log.trace("AQUIRING " + sendLatch);
+ if (!sendLatch.attempt(timeout)) throw new
ProtocolException("Send timeout.");
+ log.trace("AQUIRED " + sendLatch);
PassthroughDownPacket passthtough = new PassthroughDownPacket();
passthtough.setBuffers(packet.getBuffers());
getDownProtocol().sendDown(passthtough);
- log.trace("RELEASING " + sendMutex);
- sendMutex.release();
- log.trace("RELEASED " + sendMutex);
} catch (InterruptedException e) {
throw new ProtocolException(e);
}
1.7 +28 -16
incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/SocketProtocol.java
Index: SocketProtocol.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/SocketProtocol.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SocketProtocol.java 24 Apr 2004 04:07:13 -0000 1.6
+++ SocketProtocol.java 24 Apr 2004 06:29:01 -0000 1.7
@@ -65,6 +65,9 @@
ByteBuffer headerBuffer;
ByteBuffer bodyBuffer;
+ Object serviceReadMutex;
+ Object serviceWriteMutex;
+
static int nextConnectionId=0;
synchronized static int getNextConnectionId() {
return nextConnectionId++;
@@ -156,6 +159,8 @@
log =
LogFactory.getLog(SocketProtocol.class.getName()+":"+getNextConnectionId());
sendMutex = new Mutex();
headerBuffer = ByteBuffer.allocate(4);
+ serviceReadMutex = new Object();
+ serviceWriteMutex = new Object();
if (address == null && acceptedSocketChannel == null) throw new
IllegalStateException("No address set");
@@ -166,6 +171,7 @@
socketChannel.configureBlocking(true);
if (socketInterface != null)
socketChannel.socket().bind(socketInterface);
socketChannel.socket().setReuseAddress(true);
+ socketChannel.socket().setTcpNoDelay(true);
socketChannel.connect(address);
} catch (SocketException e) {
state = STOPPED;
@@ -244,21 +250,26 @@
}
}
- public synchronized void selectionEvent(SelectionKey selection) {
- synchronized (this) {
- try {
- if (selection.isWritable())
- serviceWrite();
- if (selection.isReadable())
- serviceRead();
- } catch (CancelledKeyException e) {
- // who knows, by the time we get here,
- // the key could have been canceled.
+ public void selectionEvent(SelectionKey selection) {
+ try {
+ if (selection.isReadable()) {
+ synchronized (serviceReadMutex) {
+ serviceRead();
+ }
}
+ if (selection.isWritable()) {
+ synchronized (serviceWriteMutex) {
+ serviceWrite();
+ }
+ }
+ } catch (CancelledKeyException e) {
+ log.trace("Key Cancelled:", e);
+ // who knows, by the time we get here,
+ // the key could have been canceled.
}
}
- synchronized private void serviceWrite() {
+ private void serviceWrite() {
log.trace("serviceWrite() triggered.");
try {
@@ -279,13 +290,14 @@
// release old buffers
sendBuffer = null;
+ log.trace("RELEASING " + sendMutex);
+ sendMutex.release();
+ log.trace("RELEASED " + sendMutex);
+
// We are done writing.
log.trace("OP_READ " + selectionKey);
selectorManager.setInterestOps(selectionKey,
SelectionKey.OP_READ, 0);
- log.trace("RELEASING " + sendMutex);
- sendMutex.release();
- log.trace("RELEASED " + sendMutex);
} catch (IOException e) {
log.debug("Communications error, closing connection: ", e);
close();
@@ -294,7 +306,7 @@
}
}
- synchronized public void serviceRead() {
+ public void serviceRead() {
boolean tracing = log.isTraceEnabled();
if (tracing) log.trace("serviceRead() triggered.");
lastUsed = System.currentTimeMillis();