Author: bdonlan
Date: 2005-05-27 15:22:09 -0400 (Fri, 27 May 2005)
New Revision: 739

Added:
   trunk/clients/Javer2/src/org/haverdev/haver/server/ObjectQueue.java
Modified:
   trunk/clients/Javer2/src/org/haverdev/haver/server/UserConnection.java
Log:
Make outgoing connections explicitly serialize all writer ops

Added: trunk/clients/Javer2/src/org/haverdev/haver/server/ObjectQueue.java
===================================================================
--- trunk/clients/Javer2/src/org/haverdev/haver/server/ObjectQueue.java 
2005-05-27 18:59:35 UTC (rev 738)
+++ trunk/clients/Javer2/src/org/haverdev/haver/server/ObjectQueue.java 
2005-05-27 19:22:09 UTC (rev 739)
@@ -0,0 +1,59 @@
+/*
+ * ObjectQueue.java
+ *
+ * Created on May 27, 2005, 3:03 PM
+ */
+
+package org.haverdev.haver.server;
+import java.util.*;
+
+/**
+ *
+ * @author bdonlan
+ */
+public class ObjectQueue {
+    LinkedList theQueue = new LinkedList();
+    
+    /** Creates a new instance of ObjectQueue */
+    public ObjectQueue() {
+    }
+    
+    public Object block(long millis) throws InterruptedException {
+        synchronized (theQueue) {
+            if (theQueue.isEmpty()) {
+                theQueue.wait(millis);
+            }
+            if (theQueue.isEmpty()) {
+                System.err.println("ObjectQueue: Awakened with nothing to show 
for it...");
+                return null;
+            }
+            Object o = theQueue.removeFirst();
+            return o;
+        }
+    }
+    
+    public Object block() throws InterruptedException {
+        return block(0);
+    }
+    
+    public boolean hasData() {
+        synchronized (theQueue) {
+            return !theQueue.isEmpty();
+        }
+    }
+    
+    public Object poll() {
+        synchronized (theQueue) {
+            if (theQueue.isEmpty())
+                return null;
+            return theQueue.removeFirst();
+        }
+    }
+    
+    public void post(Object o) {
+        synchronized (theQueue) {
+            theQueue.addLast(o);
+            theQueue.notify();
+        }
+    }
+}


Property changes on: 
trunk/clients/Javer2/src/org/haverdev/haver/server/ObjectQueue.java
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: trunk/clients/Javer2/src/org/haverdev/haver/server/UserConnection.java
===================================================================
--- trunk/clients/Javer2/src/org/haverdev/haver/server/UserConnection.java      
2005-05-27 18:59:35 UTC (rev 738)
+++ trunk/clients/Javer2/src/org/haverdev/haver/server/UserConnection.java      
2005-05-27 19:22:09 UTC (rev 739)
@@ -25,10 +25,10 @@
     static Timer timer = new Timer(true);
     
     Socket sock;
-    NonblockingOutputStream nws;
+    OutputStream nws;
     PrintWriter writer;
     BufferedReader reader;
-    Thread readThread;
+    Thread readThread, writeThread;
     UserCommandHandler context;
     UserEntity entity = null;
     
@@ -39,6 +39,17 @@
     
     Integer pingLock = new Integer(0);
     
+    ObjectQueue outgoing = new ObjectQueue();
+    
+    final class Message {
+        public String msg;
+        Message(String m) {
+            msg = m;
+        }
+    }
+    
+    final class DisconnectMsg {}
+    
     protected void doPing() {
         synchronized (pingLock) {
             if (pingTimer != null) {
@@ -110,8 +121,7 @@
             return;
         }
         sock = s;
-        nws = new NonblockingOutputStream(new 
BufferedOutputStream(s.getOutputStream()));
-        nws.setAutoFlush(true);
+        nws = new BufferedOutputStream(s.getOutputStream());
         writer = new PrintWriter(new OutputStreamWriter(nws));
         reader = new BufferedReader(new 
InputStreamReader(sock.getInputStream()));
         
@@ -120,9 +130,17 @@
                 inputLoop();
             }
         });
-        readThread.setName(Misc.prettySocket(s));
+        readThread.setName(Misc.prettySocket(s) + " reader");
         readThread.start();
         
+        writeThread = new Thread(new Runnable() {
+            public void run() {
+                outputLoop();
+            }
+        });
+        writeThread.setName(Misc.prettySocket(s) + " writer");
+        writeThread.start();
+        
         handshakeAbort = new TimerTask() {
             public void run() {
                 synchronized (pingLock) {
@@ -139,6 +157,32 @@
     
     public static final String[] greeting = {"HAVER", Misc.version};
     
+    public void outputLoop() {
+        try {
+            while (!sock.isClosed()) {
+                Object o;
+                try {
+                    o = outgoing.block();
+                } catch (InterruptedException e) {
+                    log.error("InterruptedException in outputLoop blocking", 
e);
+                    continue;
+                }
+                if (o instanceof DisconnectMsg) {
+                    writer.close();
+                    sock.close();
+                    return;
+                } else {
+                    String s = (String)o;
+                    writer.print(s);
+                }
+                if (!outgoing.hasData())
+                    writer.flush();
+            }
+        } catch (IOException e) {
+            ioExcept(e);
+        }
+    }
+    
     public void inputLoop() {
         try {
             while (!sock.isClosed()) {
@@ -164,17 +208,7 @@
     }
     
     protected synchronized void cutConnection() {
-        if (writer != null) {
-            synchronized (writer) {
-                try { writer.close(); } catch (Throwable t) { }
-                try {
-                    Thread.sleep(1000); // Hopefully flush the writer
-                                        // XXX: blocking close/flush
-                } catch (InterruptedException e) {}
-                try { sock.close(); } catch (Throwable t) { }
-                writer = null;
-            }
-        }
+        outgoing.post(new DisconnectMsg());
         synchronized (pingLock) {
             if (pingTimer != null) pingTimer.cancel();
             if (pongTimer != null) pongTimer.cancel();
@@ -202,12 +236,8 @@
     
     public void sendLine(String[] args) {
         String line = Misc.encodeLine(args);
-        synchronized (writer) {
-            if (writer == null) return;
-            log.debug("S: " + line);
-            writer.print(line);
-            writer.flush();
-        }
+        outgoing.post(line);
+        log.debug("S: " + line);
     }
     
     public synchronized void processLine(String line) throws 
PropagatedException {


Reply via email to