Author: bdonlan
Date: 2005-05-27 15:22:09 -0400 (Fri, 27 May 2005)
New Revision: 739
Added:
trunk/clients/Javer2/src/org/haverdev/haver/server/ObjectQueue.java
Modified:
trunk/clients/Javer2/src/org/haverdev/haver/server/UserConnection.java
Log:
Make outgoing connections explicitly serialize all writer ops
Added: trunk/clients/Javer2/src/org/haverdev/haver/server/ObjectQueue.java
===================================================================
--- trunk/clients/Javer2/src/org/haverdev/haver/server/ObjectQueue.java
2005-05-27 18:59:35 UTC (rev 738)
+++ trunk/clients/Javer2/src/org/haverdev/haver/server/ObjectQueue.java
2005-05-27 19:22:09 UTC (rev 739)
@@ -0,0 +1,59 @@
+/*
+ * ObjectQueue.java
+ *
+ * Created on May 27, 2005, 3:03 PM
+ */
+
+package org.haverdev.haver.server;
+import java.util.*;
+
+/**
+ *
+ * @author bdonlan
+ */
+public class ObjectQueue {
+ LinkedList theQueue = new LinkedList();
+
+ /** Creates a new instance of ObjectQueue */
+ public ObjectQueue() {
+ }
+
+ public Object block(long millis) throws InterruptedException {
+ synchronized (theQueue) {
+ if (theQueue.isEmpty()) {
+ theQueue.wait(millis);
+ }
+ if (theQueue.isEmpty()) {
+ System.err.println("ObjectQueue: Awakened with nothing to show
for it...");
+ return null;
+ }
+ Object o = theQueue.removeFirst();
+ return o;
+ }
+ }
+
+ public Object block() throws InterruptedException {
+ return block(0);
+ }
+
+ public boolean hasData() {
+ synchronized (theQueue) {
+ return !theQueue.isEmpty();
+ }
+ }
+
+ public Object poll() {
+ synchronized (theQueue) {
+ if (theQueue.isEmpty())
+ return null;
+ return theQueue.removeFirst();
+ }
+ }
+
+ public void post(Object o) {
+ synchronized (theQueue) {
+ theQueue.addLast(o);
+ theQueue.notify();
+ }
+ }
+}
Property changes on:
trunk/clients/Javer2/src/org/haverdev/haver/server/ObjectQueue.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: trunk/clients/Javer2/src/org/haverdev/haver/server/UserConnection.java
===================================================================
--- trunk/clients/Javer2/src/org/haverdev/haver/server/UserConnection.java
2005-05-27 18:59:35 UTC (rev 738)
+++ trunk/clients/Javer2/src/org/haverdev/haver/server/UserConnection.java
2005-05-27 19:22:09 UTC (rev 739)
@@ -25,10 +25,10 @@
static Timer timer = new Timer(true);
Socket sock;
- NonblockingOutputStream nws;
+ OutputStream nws;
PrintWriter writer;
BufferedReader reader;
- Thread readThread;
+ Thread readThread, writeThread;
UserCommandHandler context;
UserEntity entity = null;
@@ -39,6 +39,17 @@
Integer pingLock = new Integer(0);
+ ObjectQueue outgoing = new ObjectQueue();
+
+ final class Message {
+ public String msg;
+ Message(String m) {
+ msg = m;
+ }
+ }
+
+ final class DisconnectMsg {}
+
protected void doPing() {
synchronized (pingLock) {
if (pingTimer != null) {
@@ -110,8 +121,7 @@
return;
}
sock = s;
- nws = new NonblockingOutputStream(new
BufferedOutputStream(s.getOutputStream()));
- nws.setAutoFlush(true);
+ nws = new BufferedOutputStream(s.getOutputStream());
writer = new PrintWriter(new OutputStreamWriter(nws));
reader = new BufferedReader(new
InputStreamReader(sock.getInputStream()));
@@ -120,9 +130,17 @@
inputLoop();
}
});
- readThread.setName(Misc.prettySocket(s));
+ readThread.setName(Misc.prettySocket(s) + " reader");
readThread.start();
+ writeThread = new Thread(new Runnable() {
+ public void run() {
+ outputLoop();
+ }
+ });
+ writeThread.setName(Misc.prettySocket(s) + " writer");
+ writeThread.start();
+
handshakeAbort = new TimerTask() {
public void run() {
synchronized (pingLock) {
@@ -139,6 +157,32 @@
public static final String[] greeting = {"HAVER", Misc.version};
+ public void outputLoop() {
+ try {
+ while (!sock.isClosed()) {
+ Object o;
+ try {
+ o = outgoing.block();
+ } catch (InterruptedException e) {
+ log.error("InterruptedException in outputLoop blocking",
e);
+ continue;
+ }
+ if (o instanceof DisconnectMsg) {
+ writer.close();
+ sock.close();
+ return;
+ } else {
+ String s = (String)o;
+ writer.print(s);
+ }
+ if (!outgoing.hasData())
+ writer.flush();
+ }
+ } catch (IOException e) {
+ ioExcept(e);
+ }
+ }
+
public void inputLoop() {
try {
while (!sock.isClosed()) {
@@ -164,17 +208,7 @@
}
protected synchronized void cutConnection() {
- if (writer != null) {
- synchronized (writer) {
- try { writer.close(); } catch (Throwable t) { }
- try {
- Thread.sleep(1000); // Hopefully flush the writer
- // XXX: blocking close/flush
- } catch (InterruptedException e) {}
- try { sock.close(); } catch (Throwable t) { }
- writer = null;
- }
- }
+ outgoing.post(new DisconnectMsg());
synchronized (pingLock) {
if (pingTimer != null) pingTimer.cancel();
if (pongTimer != null) pongTimer.cancel();
@@ -202,12 +236,8 @@
public void sendLine(String[] args) {
String line = Misc.encodeLine(args);
- synchronized (writer) {
- if (writer == null) return;
- log.debug("S: " + line);
- writer.print(line);
- writer.flush();
- }
+ outgoing.post(line);
+ log.debug("S: " + line);
}
public synchronized void processLine(String line) throws
PropagatedException {