Author: peter_firmstone
Date: Thu Nov 26 11:56:32 2015
New Revision: 1716613

URL: http://svn.apache.org/viewvc?rev=1716613&view=rev
Log:
Commit my local Jeri multiplexer stability improvements to assist with jtreg 
multiplexer nio tests:

net.jini.jeri.tcp.outOfThreads.OutOfThreads.java
net.jini.jeri.tcp.outOfThreads.OutOfThreads2.java

Modified:
    
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java
    
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java
    
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java
    
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java
    
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java

Modified: 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java?rev=1716613&r1=1716612&r2=1716613&view=diff
==============================================================================
--- 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java
 (original)
+++ 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java
 Thu Nov 26 11:56:32 2015
@@ -34,9 +34,7 @@ import java.nio.charset.CharsetEncoder;
 import java.security.AccessController;
 import java.util.BitSet;
 import java.util.Deque;
-import java.util.HashMap;
 import java.util.LinkedList;
-import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -107,6 +105,7 @@ abstract class Mux {
 
        public void run() {
            for (int i = 0; i < sessions.length; i++) {
+               if (sessions[i] != null)
                sessions[i].setDown(message, cause);
            }
        }
@@ -135,7 +134,7 @@ abstract class Mux {
     Throwable muxDownCause;
 
     final BitSet busySessions = new BitSet();
-    final Map<Integer,Session> sessions = new HashMap<Integer,Session>(128);
+    final Session [] sessions = new Session[MAX_SESSION_ID + 1];
 
     private int expectedPingCookie = -1;
     
@@ -274,10 +273,15 @@ abstract class Mux {
        assert Thread.holdsLock(muxLock);
        assert !muxDown;
        assert !busySessions.get(sessionID);
-       assert sessions.get(Integer.valueOf(sessionID)) == null;
+//     assert sessions.get(Byte.valueOf(sessionID)) == null;
+       assert sessions[sessionID] == null;
 
        busySessions.set(sessionID);
-       sessions.put(Integer.valueOf(sessionID), session);
+//     Throwable t = new Throwable();
+//     System.out.println("Setting sessionID: "+ sessionID);
+//     t.printStackTrace(System.out);
+//     sessions.put(Byte.valueOf(sessionID), session);
+       sessions[sessionID] = session;
     }
 
     /**
@@ -285,14 +289,17 @@ abstract class Mux {
      * This method is intended to be invoked by this class and
      * subclasses only.
      *
-     * This method MAY be invoked while synchronized on muxLock.
+     * This method MAY be invoked while synchronized on muxLock if failure
+     * occurs during start up.
      */
     final void setDown(final String message, final Throwable cause) {
+       SessionShutdownTask sst = null;
        synchronized (muxLock) {
            if (muxDown) return;
            muxDown = true;
            muxDownMessage = message;
            muxDownCause = cause;
+           sst = new SessionShutdownTask(sessions.clone(), message, cause);
            muxLock.notifyAll();
        }
 
@@ -309,11 +316,8 @@ abstract class Mux {
              */
        boolean needWorker = false;
             synchronized (sessionShutdownQueue) {
-                if (!sessions.isEmpty()) {
-                    sessionShutdownQueue.add(new SessionShutdownTask(
-                        (Session[]) sessions.values().toArray(
-                            new Session[sessions.values().size()]),
-                        message, cause));
+                if (sst != null) {
+                    sessionShutdownQueue.add(sst);
                     needWorker = true;
                 } else {
                     needWorker = !sessionShutdownQueue.isEmpty();
@@ -360,7 +364,7 @@ abstract class Mux {
            }
            assert busySessions.get(sessionID);
            busySessions.clear(sessionID);
-           sessions.remove(Integer.valueOf(sessionID));
+           sessions[sessionID] = null;
        }
     }
 
@@ -1178,8 +1182,7 @@ abstract class Mux {
        getSession(sessionID).handleAcknowledgment();
     }
 
-    private void handleData(int sessionID, boolean open, boolean close,
-                           boolean eof, boolean ackRequired, ByteBuffer data)
+    private void handleData(int sessionID, boolean open, boolean close, 
boolean eof, boolean ackRequired, ByteBuffer data)
        throws ProtocolException
     {
        if (logger.isLoggable(Level.FINEST)) {
@@ -1219,7 +1222,7 @@ abstract class Mux {
                throw new ProtocolException(
                    "inactive sessionID: " + sessionID);
            }
-           return (Session) sessions.get(Integer.valueOf(sessionID));
+           return sessions[sessionID];
        }
     }
 

Modified: 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java?rev=1716613&r1=1716612&r2=1716613&view=diff
==============================================================================
--- 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java
 (original)
+++ 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java
 Thu Nov 26 11:56:32 2015
@@ -1,129 +1,129 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.river.jeri.internal.mux;
-
-import org.apache.river.action.GetIntegerAction;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.channels.SocketChannel;
-import java.security.AccessController;
-import java.util.Collection;
-import net.jini.jeri.OutboundRequest;
-
-/**
- * A MuxClient controls the client side of multiplexed connection.
- *
- * @author Sun Microsystems, Inc.
- **/
-public class MuxClient extends Mux {
-
-    /** initial inbound ration as client, default is 32768 */
-    private static final int clientInitialInboundRation =
-       ((Integer) AccessController.doPrivileged(new GetIntegerAction(
-           "org.apache.river.jeri.connection.mux.client.initialInboundRation",
-           32768))).intValue();
-
-    /**
-     * Initiates the client side of the multiplexed connection over
-     * the given input/output stream pair.
-     *
-     * @param out the output stream of the underlying connection
-     *
-     * @param in the input stream of the underlying connection
-     **/
-    public MuxClient(OutputStream out, InputStream in) throws IOException {
-       super(out, in, Mux.CLIENT, clientInitialInboundRation, 1024);
-    }
-
-    public MuxClient(SocketChannel channel) throws IOException {
-       super(channel, Mux.CLIENT, clientInitialInboundRation, 1024);
-    }
-
-    /**
-     * Starts a new request over this connection, returning the
-     * corresponding OutboundRequest object.
-     *
-     * @return the OutboundRequest for the newly created request
-     **/
-    public OutboundRequest newRequest()        throws IOException {
-       synchronized (muxLock) {
-           if (muxDown) {
-               IOException ioe = new IOException(muxDownMessage);
-               ioe.initCause(muxDownCause);
-               throw ioe;
-           }
-           int sessionID = busySessions.nextClearBit(0);
-           if (sessionID > Mux.MAX_SESSION_ID) {
-               throw new IOException("no free sessions");
-           }
-
-           Session session = new Session(this, sessionID, Session.CLIENT);
-           addSession(sessionID, session);
-           return session.getOutboundRequest();
-       }
-    }
-
-    /**
-     * Returns the current number of requests in progress over this
-     * connection.
-     *
-     * The value is guaranteed to not increase until the next
-     * invocation of the newRequest method.
-     *
-     * @return the number of requests in progress over this connection
-     *
-     * @throws IOException if the multiplexed connection is no longer
-     * active
-     **/
-    public int requestsInProgress() throws IOException {
-       synchronized (muxLock) {
-           if (muxDown) {
-               IOException ioe = new IOException(muxDownMessage);
-               ioe.initCause(muxDownCause);
-               throw ioe;
-           }
-           return busySessions.cardinality();
-       }
-    }
-
-    /**
-     * Shuts down this multiplexed connection.  Requests in progress
-     * will throw IOException for future I/O operations.
-     *
-     * @param message reason for shutdown to be included in
-     * IOExceptions thrown from future I/O operations
-     **/
-    public void shutdown(String message) {
-       synchronized (muxLock) {
-           setDown(message, null);
-       }
-    }
-
-    /**
-     * Populates the context collection with information representing
-     * this connection.
-     *
-     * This method should be overridden by subclasses to implement the
-     * desired behavior of the populateContext method for
-     * OutboundRequest instances generated for this connection.
-     **/
-    protected void populateContext(Collection context) {
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.jeri.internal.mux;
+
+import org.apache.river.action.GetIntegerAction;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.SocketChannel;
+import java.security.AccessController;
+import java.util.Collection;
+import net.jini.jeri.OutboundRequest;
+
+/**
+ * A MuxClient controls the client side of multiplexed connection.
+ *
+ * @author Sun Microsystems, Inc.
+ **/
+public class MuxClient extends Mux {
+
+    /** initial inbound ration as client, default is 32768 */
+    private static final int clientInitialInboundRation =
+       ((Integer) AccessController.doPrivileged(new GetIntegerAction(
+           "org.apache.river.jeri.connection.mux.client.initialInboundRation",
+           32768))).intValue();
+
+    /**
+     * Initiates the client side of the multiplexed connection over
+     * the given input/output stream pair.
+     *
+     * @param out the output stream of the underlying connection
+     *
+     * @param in the input stream of the underlying connection
+     **/
+    public MuxClient(OutputStream out, InputStream in) throws IOException {
+       super(out, in, Mux.CLIENT, clientInitialInboundRation, 1024);
+    }
+
+    public MuxClient(SocketChannel channel) throws IOException {
+       super(channel, Mux.CLIENT, clientInitialInboundRation, 1024);
+    }
+
+    /**
+     * Starts a new request over this connection, returning the
+     * corresponding OutboundRequest object.
+     *
+     * @return the OutboundRequest for the newly created request
+     **/
+    public OutboundRequest newRequest()        throws IOException {
+       synchronized (muxLock) {
+           if (muxDown) {
+               IOException ioe = new IOException(muxDownMessage);
+               ioe.initCause(muxDownCause);
+               throw ioe;
+           }
+           byte sessionID = (byte) busySessions.nextClearBit(0);
+           if (sessionID > Mux.MAX_SESSION_ID) {
+               throw new IOException("no free sessions");
+           }
+
+           Session session = new Session(this, sessionID, Session.CLIENT);
+           addSession(sessionID, session);
+           return session.getOutboundRequest();
+       }
+    }
+
+    /**
+     * Returns the current number of requests in progress over this
+     * connection.
+     *
+     * The value is guaranteed to not increase until the next
+     * invocation of the newRequest method.
+     *
+     * @return the number of requests in progress over this connection
+     *
+     * @throws IOException if the multiplexed connection is no longer
+     * active
+     **/
+    public int requestsInProgress() throws IOException {
+       synchronized (muxLock) {
+           if (muxDown) {
+               IOException ioe = new IOException(muxDownMessage);
+               ioe.initCause(muxDownCause);
+               throw ioe;
+           }
+           return busySessions.cardinality();
+       }
+    }
+
+    /**
+     * Shuts down this multiplexed connection.  Requests in progress
+     * will throw IOException for future I/O operations.
+     *
+     * @param message reason for shutdown to be included in
+     * IOExceptions thrown from future I/O operations
+     **/
+    public void shutdown(String message) {
+       synchronized (muxLock) {
+           setDown(message, null);
+       }
+    }
+
+    /**
+     * Populates the context collection with information representing
+     * this connection.
+     *
+     * This method should be overridden by subclasses to implement the
+     * desired behavior of the populateContext method for
+     * OutboundRequest instances generated for this connection.
+     **/
+    protected void populateContext(Collection context) {
+    }
+}

Modified: 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java?rev=1716613&r1=1716612&r2=1716613&view=diff
==============================================================================
--- 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java
 (original)
+++ 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java
 Thu Nov 26 11:56:32 2015
@@ -1,308 +1,308 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.river.jeri.internal.mux;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Deque;
-import java.util.LinkedList;
-
-/**
- * Output stream returned by OutboundRequests and InboundRequests for
- * a session of a multiplexed connection.
- */
-class MuxInputStream extends InputStream {
-    private final Object sessionLock;
-    private final Session session;
-    private final Mux mux;
-    private final Deque<ByteBuffer> inBufQueue;
-    private IOException sessionDown = null;
-    private int inBufRemaining = 0;
-    private int inBufPos = 0;
-    private boolean inEOF = false;
-    private boolean inClosed = false;
-    private boolean sentAcknowledgment = false;
-
-    MuxInputStream(Mux mux, Session session, Object sessionLock) {
-        this.mux = mux;
-        this.session = session;
-        this.sessionLock = sessionLock;
-        this.inBufQueue = new LinkedList<ByteBuffer>();
-    }
-
-    void down(IOException e) {
-        sessionDown = e;
-    }
-
-    void appendToBufQueue(ByteBuffer data) {
-        inBufQueue.addLast(data);
-    }
-
-    @Override
-    public int read() throws IOException {
-        synchronized (sessionLock) {
-            if (inClosed) {
-                throw new IOException("stream closed");
-            }
-            while (inBufRemaining == 0 && sessionDown == null && 
session.getInState() <= Session.OPEN && !inClosed) {
-                if (session.getInState() == Session.IDLE) {
-                    assert session.getOutState() == Session.IDLE;
-                    mux.asyncSendData(Mux.Data | Mux.Data_open, 
session.sessionID, null);
-                    session.setOutState(Session.OPEN);
-                    session.setInState(Session.OPEN);
-                }
-                if (!session.inRationInfinite && session.getInRation() == 0) {
-                    int inc = mux.initialInboundRation;
-                    mux.asyncSendIncrementRation(session.sessionID, inc);
-                    session.setInRation(session.getInRation() + inc);
-                }
-                try {
-                    sessionLock.wait(); // REMIND: timeout?
-                } catch (InterruptedException e) {
-                    String message = "request I/O interrupted";
-                    session.setDown(message, e);
-                    throw wrap(message, e);
-                }
-            }
-            if (inClosed) {
-                throw new IOException("stream closed");
-            }
-            if (inBufRemaining == 0) {
-                if (inEOF) {
-                    return -1;
-                } else {
-                    if (session.getInState() == Session.TERMINATED) {
-                        throw new IOException("request aborted by remote 
endpoint");
-                    }
-                    assert sessionDown != null;
-                    throw sessionDown;
-                }
-            }
-            assert inBufQueue.size() > 0;
-            int result = -1;
-            while (result == -1) {
-                ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
-                if (inBufPos < buf.limit()) {
-                    result = (buf.get() & 0xFF);
-                    inBufPos++;
-                    inBufRemaining--;
-                }
-                if (inBufPos == buf.limit()) {
-                    inBufQueue.removeFirst();
-                    inBufPos = 0;
-                }
-            }
-            if (!session.inRationInfinite) {
-                checkInboundRation();
-            }
-            return result;
-        }
-    }
-
-    private IOException wrap(String message, Exception e) {
-        Throwable t;
-        if (Session.traceSupression()) {
-            t = e;
-        } else {
-            t = e.fillInStackTrace();
-        }
-        return new IOException(message, t);
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        if (b == null) {
-            throw new NullPointerException();
-        } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) 
> b.length) || ((off + len) < 0)) {
-            throw new IndexOutOfBoundsException();
-        }
-        synchronized (sessionLock) {
-            if (inClosed) {
-                throw new IOException("stream closed");
-            } else if (len == 0) {
-                /*
-                 * REMIND: What if
-                 *     - stream is at EOF?
-                 *     - session was aborted?
-                 */
-                return 0;
-            }
-            while (inBufRemaining == 0 && sessionDown == null && 
session.getInState() <= Session.OPEN && !inClosed) {
-                if (session.getInState() == Session.IDLE) {
-                    assert session.getOutState() == Session.IDLE;
-                    mux.asyncSendData(Mux.Data | Mux.Data_open, 
session.sessionID, null);
-                    session.setOutState(Session.OPEN);
-                    session.setInState(Session.OPEN);
-                }
-                if (!session.inRationInfinite && session.getInRation() == 0) {
-                    int inc = mux.initialInboundRation;
-                    mux.asyncSendIncrementRation(session.sessionID, inc);
-                    session.setInRation(session.getInRation() + inc);
-                }
-                try {
-                    sessionLock.wait(); // REMIND: timeout?
-                } catch (InterruptedException e) {
-                    String message = "request I/O interrupted";
-                    session.setDown(message, e);
-                    throw wrap(message, e);
-                }
-            }
-            if (inClosed) {
-                throw new IOException("stream closed");
-            }
-            if (inBufRemaining == 0) {
-                if (inEOF) {
-                    return -1;
-                } else {
-                    if (session.getInState() == Session.TERMINATED) {
-                        throw new IOException("request aborted by remote 
endpoint");
-                    }
-                    assert sessionDown != null;
-                    throw sessionDown;
-                }
-            }
-            assert inBufQueue.size() > 0;
-            int remaining = len;
-            while (remaining > 0 && inBufRemaining > 0) {
-                ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
-                if (inBufPos < buf.limit()) {
-                    int toCopy = Math.min(buf.limit() - inBufPos, remaining);
-                    buf.get(b, off, toCopy);
-                    inBufPos += toCopy;
-                    inBufRemaining -= toCopy;
-                    off += toCopy;
-                    remaining -= toCopy;
-                }
-                if (inBufPos == buf.limit()) {
-                    inBufQueue.removeFirst();
-                    inBufPos = 0;
-                }
-            }
-            if (!session.inRationInfinite) {
-                checkInboundRation();
-            }
-            return len - remaining;
-        }
-    }
-
-    /**
-     * Sends ration increment, if read drained buffers below
-     * a certain mark.
-     *
-     * This method must NOT be invoked if the inbound ration in
-     * infinite, and it must ONLY be invoked while synchronized on
-     * this session's lock.
-     *
-     * REMIND: The implementation of this action will be a
-     * significant area for performance tuning.
-     */
-    private void checkInboundRation() {
-        assert Thread.holdsLock(sessionLock);
-        assert !session.inRationInfinite;
-        if (session.getInState() >= Session.FINISHED) {
-            return;
-        }
-        int mark = mux.initialInboundRation / 2;
-        int used = inBufRemaining + session.getInRation();
-        if (used <= mark) {
-            int inc = mux.initialInboundRation - used;
-            mux.asyncSendIncrementRation(session.sessionID, inc);
-            session.setInRation(session.getInRation() + inc);
-        }
-    }
-
-    @Override
-    public int available() throws IOException {
-        synchronized (sessionLock) {
-            if (inClosed) {
-                throw new IOException("stream closed");
-            }
-            /*
-             * REMIND: What if
-             *     - stream is at EOF?
-             *     - session was aborted?
-             */
-            return inBufRemaining;
-        }
-    }
-
-    @Override
-    public void close() {
-        synchronized (sessionLock) {
-            if (inClosed) {
-                return;
-            }
-            inClosed = true;
-            inBufQueue.clear(); // allow GC of unread data
-            if (session.role == Session.CLIENT && !sentAcknowledgment && 
session.isReceivedAckRequired() && session.getOutState() < Session.TERMINATED) {
-                mux.asyncSendAcknowledgment(session.sessionID);
-                sentAcknowledgment = true;
-                /*
-                 * If removing this session from the connection's
-                 * table was delayed in order to be able to send
-                 * an Acknowledgment message, then take care of
-                 * removing it now.
-                 */
-                if (session.isRemoveLater()) {
-                    session.setOutState(Session.TERMINATED);
-                    mux.removeSession(session.sessionID);
-                    session.setRemoveLater(false);
-                }
-            }
-            sessionLock.notifyAll();
-        }
-    }
-
-    /**
-     * @return the sentAcknowledgment
-     */
-    boolean isSentAcknowledgment() {
-        return sentAcknowledgment;
-    }
-
-    /**
-     * @return the inBufRemaining
-     */
-    int getBufRemaining() {
-        return inBufRemaining;
-    }
-
-    /**
-     * @return the inClosed
-     */
-    boolean isClosed() {
-        return inClosed;
-    }
-
-    /**
-     * @param inBufRemaining the inBufRemaining to set
-     */
-    void setBufRemaining(int inBufRemaining) {
-        this.inBufRemaining = inBufRemaining;
-    }
-
-    /**
-     * @param inEOF the inEOF to set
-     */
-    void setEOF(boolean inEOF) {
-        this.inEOF = inEOF;
-    }
-    
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.jeri.internal.mux;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.LinkedList;
+
+/**
+ * Output stream returned by OutboundRequests and InboundRequests for
+ * a session of a multiplexed connection.
+ */
+class MuxInputStream extends InputStream {
+    private final Object sessionLock;
+    private final Session session;
+    private final Mux mux;
+    private final Deque<ByteBuffer> inBufQueue;
+    private IOException sessionDown = null;
+    private int inBufRemaining = 0;
+    private int inBufPos = 0;
+    private boolean inEOF = false;
+    private boolean inClosed = false;
+    private boolean sentAcknowledgment = false;
+
+    MuxInputStream(Mux mux, Session session, Object sessionLock) {
+        this.mux = mux;
+        this.session = session;
+        this.sessionLock = sessionLock;
+        this.inBufQueue = new LinkedList<ByteBuffer>();
+    }
+
+    void down(IOException e) {
+        sessionDown = e;
+    }
+
+    void appendToBufQueue(ByteBuffer data) {
+        inBufQueue.addLast(data);
+    }
+
+    @Override
+    public int read() throws IOException {
+        synchronized (sessionLock) {
+            if (inClosed) {
+                throw new IOException("stream closed");
+            }
+            while (inBufRemaining == 0 && sessionDown == null && 
session.getInState() <= Session.OPEN && !inClosed) {
+                if (session.getInState() == Session.IDLE) {
+                    assert session.getOutState() == Session.IDLE;
+                    mux.asyncSendData(Mux.Data | Mux.Data_open, 
session.sessionID, null);
+                    session.setOutState(Session.OPEN);
+                    session.setInState(Session.OPEN);
+                }
+                if (!session.inRationInfinite && session.getInRation() == 0) {
+                    int inc = mux.initialInboundRation;
+                    mux.asyncSendIncrementRation(session.sessionID, inc);
+                    session.setInRation(session.getInRation() + inc);
+                }
+                try {
+                    sessionLock.wait(5000L); // REMIND: timeout?
+                } catch (InterruptedException e) {
+                    String message = "request I/O interrupted";
+                    session.setDown(message, e);
+                    throw wrap(message, e);
+                }
+            }
+            if (inClosed) {
+                throw new IOException("stream closed");
+            }
+            if (inBufRemaining == 0) {
+                if (inEOF) {
+                    return -1;
+                } else {
+                    if (session.getInState() == Session.TERMINATED) {
+                        throw new IOException("request aborted by remote 
endpoint");
+                    }
+                    assert sessionDown != null;
+                    throw sessionDown;
+                }
+            }
+            assert inBufQueue.size() > 0;
+            int result = -1;
+            while (result == -1) {
+                ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
+                if (inBufPos < buf.limit()) {
+                    result = (buf.get() & 0xFF);
+                    inBufPos++;
+                    inBufRemaining--;
+                }
+                if (inBufPos == buf.limit()) {
+                    inBufQueue.removeFirst();
+                    inBufPos = 0;
+                }
+            }
+            if (!session.inRationInfinite) {
+                checkInboundRation();
+            }
+            return result;
+        }
+    }
+
+    private IOException wrap(String message, Exception e) {
+        Throwable t;
+        if (Session.traceSupression()) {
+            t = e;
+        } else {
+            t = e.fillInStackTrace();
+        }
+        return new IOException(message, t);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) 
> b.length) || ((off + len) < 0)) {
+            throw new IndexOutOfBoundsException();
+        }
+        synchronized (sessionLock) {
+            if (inClosed) {
+                throw new IOException("stream closed");
+            } else if (len == 0) {
+                /*
+                 * REMIND: What if
+                 *     - stream is at EOF?
+                 *     - session was aborted?
+                 */
+                return 0;
+            }
+            while (inBufRemaining == 0 && sessionDown == null && 
session.getInState() <= Session.OPEN && !inClosed) {
+                if (session.getInState() == Session.IDLE) {
+                    assert session.getOutState() == Session.IDLE;
+                    mux.asyncSendData(Mux.Data | Mux.Data_open, 
session.sessionID, null);
+                    session.setOutState(Session.OPEN);
+                    session.setInState(Session.OPEN);
+                }
+                if (!session.inRationInfinite && session.getInRation() == 0) {
+                    int inc = mux.initialInboundRation;
+                    mux.asyncSendIncrementRation(session.sessionID, inc);
+                    session.setInRation(session.getInRation() + inc);
+                }
+                try {
+                    sessionLock.wait(5000L); // REMIND: timeout?
+                } catch (InterruptedException e) {
+                    String message = "request I/O interrupted";
+                    session.setDown(message, e);
+                    throw wrap(message, e);
+                }
+            }
+            if (inClosed) {
+                throw new IOException("stream closed");
+            }
+            if (inBufRemaining == 0) {
+                if (inEOF) {
+                    return -1;
+                } else {
+                    if (session.getInState() == Session.TERMINATED) {
+                        throw new IOException("request aborted by remote 
endpoint");
+                    }
+                    assert sessionDown != null;
+                    throw sessionDown;
+                }
+            }
+            assert inBufQueue.size() > 0;
+            int remaining = len;
+            while (remaining > 0 && inBufRemaining > 0) {
+                ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
+                if (inBufPos < buf.limit()) {
+                    int toCopy = Math.min(buf.limit() - inBufPos, remaining);
+                    buf.get(b, off, toCopy);
+                    inBufPos += toCopy;
+                    inBufRemaining -= toCopy;
+                    off += toCopy;
+                    remaining -= toCopy;
+                }
+                if (inBufPos == buf.limit()) {
+                    inBufQueue.removeFirst();
+                    inBufPos = 0;
+                }
+            }
+            if (!session.inRationInfinite) {
+                checkInboundRation();
+            }
+            return len - remaining;
+        }
+    }
+
+    /**
+     * Sends ration increment, if read drained buffers below
+     * a certain mark.
+     *
+     * This method must NOT be invoked if the inbound ration in
+     * infinite, and it must ONLY be invoked while synchronized on
+     * this session's lock.
+     *
+     * REMIND: The implementation of this action will be a
+     * significant area for performance tuning.
+     */
+    private void checkInboundRation() {
+        assert Thread.holdsLock(sessionLock);
+        assert !session.inRationInfinite;
+        if (session.getInState() >= Session.FINISHED) {
+            return;
+        }
+        int mark = mux.initialInboundRation / 2;
+        int used = inBufRemaining + session.getInRation();
+        if (used <= mark) {
+            int inc = mux.initialInboundRation - used;
+            mux.asyncSendIncrementRation(session.sessionID, inc);
+            session.setInRation(session.getInRation() + inc);
+        }
+    }
+
+    @Override
+    public int available() throws IOException {
+        synchronized (sessionLock) {
+            if (inClosed) {
+                throw new IOException("stream closed");
+            }
+            /*
+             * REMIND: What if
+             *     - stream is at EOF?
+             *     - session was aborted?
+             */
+            return inBufRemaining;
+        }
+    }
+
+    @Override
+    public void close() {
+        synchronized (sessionLock) {
+            if (inClosed) {
+                return;
+            }
+            inClosed = true;
+            inBufQueue.clear(); // allow GC of unread data
+            if (session.role == Session.CLIENT && !sentAcknowledgment && 
session.isReceivedAckRequired() && session.getOutState() < Session.TERMINATED) {
+                mux.asyncSendAcknowledgment(session.sessionID);
+                sentAcknowledgment = true;
+                /*
+                 * If removing this session from the connection's
+                 * table was delayed in order to be able to send
+                 * an Acknowledgment message, then take care of
+                 * removing it now.
+                 */
+                if (session.isRemoveLater()) {
+                    session.setOutState(Session.TERMINATED);
+                    mux.removeSession(session.sessionID);
+                    session.setRemoveLater(false);
+                }
+            }
+            sessionLock.notifyAll();
+        }
+    }
+
+    /**
+     * @return the sentAcknowledgment
+     */
+    boolean isSentAcknowledgment() {
+        return sentAcknowledgment;
+    }
+
+    /**
+     * @return the inBufRemaining
+     */
+    int getBufRemaining() {
+        return inBufRemaining;
+    }
+
+    /**
+     * @return the inClosed
+     */
+    boolean isClosed() {
+        return inClosed;
+    }
+
+    /**
+     * @param inBufRemaining the inBufRemaining to set
+     */
+    void setBufRemaining(int inBufRemaining) {
+        this.inBufRemaining = inBufRemaining;
+    }
+
+    /**
+     * @param inEOF the inEOF to set
+     */
+    void setEOF(boolean inEOF) {
+        this.inEOF = inEOF;
+    }
+    
+}

Modified: 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java?rev=1716613&r1=1716612&r2=1716613&view=diff
==============================================================================
--- 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java
 (original)
+++ 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java
 Thu Nov 26 11:56:32 2015
@@ -210,7 +210,7 @@ public class MuxServer extends Mux {
                dispatchNewRequest(sessionID);
                return;
            } else {
-               session = (Session) sessions.get(Integer.valueOf(sessionID));
+               session = sessions[sessionID];
                assert session != null;
            }
        }

Modified: 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java?rev=1716613&r1=1716612&r2=1716613&view=diff
==============================================================================
--- 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java
 (original)
+++ 
river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java
 Thu Nov 26 11:56:32 2015
@@ -1,434 +1,434 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.river.jeri.internal.mux;
-
-import org.apache.river.logging.Levels;
-import org.apache.river.thread.Executor;
-import org.apache.river.thread.GetThreadPoolAction;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.security.AccessController;
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * StreamConnectionIO implements the ConnectionIO abstraction for a
- * connection accessible through standard (blocking) I/O streams, i.e.
- * java.io.OutputStream and java.io.InputStream.
- *
- * @author Sun Microsystems, Inc.
- **/
-final class StreamConnectionIO extends ConnectionIO {
-
-    private static final int RECEIVE_BUFFER_SIZE = 2048;
-
-    /**
-     * pool of threads for executing tasks in system thread group:
-     * used for I/O (reader and writer) threads
-     */
-    private static final Executor systemThreadPool =
-       (Executor) AccessController.doPrivileged(
-           new GetThreadPoolAction(false));
-
-    /** mux logger */
-    private static final Logger logger =
-       Logger.getLogger("net.jini.jeri.connection.mux");
-
-    /** I/O streams for underlying connection */
-    private final OutputStream out;
-    private final InputStream in;
-
-    /** channels wrapped around underlying I/O streams */
-    private final WritableByteChannel outChannel;
-    private final ReadableByteChannel inChannel;
-
-    /**
-     * queue of buffers of data to be sent over connection, interspersed
-     * with IOFuture objects that need to be notified in sequence
-     * 
-     * Synchronised on super.mux.muxLock;
-     */
-    private final Deque sendQueue;
-
-    
-    /**
-     * Creates a new StreamConnectionIO for the connection represented by
-     * the supplied OutputStream and InputStream pair.
-     */
-    StreamConnectionIO(Mux mux, OutputStream out, InputStream in) {
-       super(mux);
-       this.out = out;
-//     this.out = new BufferedOutputStream(out);
-       this.in = in;
-
-       outChannel = newChannel(out);
-       inChannel = newChannel(in);
-        sendQueue = new LinkedList();
-    }
-
-    /**
-     * Starts processing connection data.  This method starts
-     * asynchronous actions to read and write from the connection.
-     */
-    @Override
-    void start() throws IOException {
-       try {
-           systemThreadPool.execute(new Writer(), "mux writer");
-           systemThreadPool.execute(new Reader(), "mux reader");
-       } catch (OutOfMemoryError e) {  // assume out of threads
-           try {
-               logger.log(Level.WARNING,
-                          "could not create thread for request dispatch", e);
-           } catch (Throwable t) {
-           }
-           throw new IOException("could not create I/O threads", e);
-       }
-    }
-
-    @Override
-    void asyncSend(ByteBuffer buffer) {
-       synchronized (mux.muxLock) {
-           if (mux.muxDown) {
-               return;
-           }
-           sendQueue.addLast(buffer);
-           mux.muxLock.notifyAll();
-       }
-    }
-
-    @Override
-    void asyncSend(ByteBuffer first, ByteBuffer second) {
-       synchronized (mux.muxLock) {
-           if (mux.muxDown) {
-               return;
-           }
-           sendQueue.addLast(first);
-           sendQueue.addLast(second);
-           mux.muxLock.notifyAll();
-       }
-    }
-
-    @Override
-    IOFuture futureSend(ByteBuffer first, ByteBuffer second) {
-       synchronized (mux.muxLock) {
-           IOFuture future = new IOFuture();
-           if (mux.muxDown) {
-               IOException ioe = new IOException(mux.muxDownMessage);
-               ioe.initCause(mux.muxDownCause);
-               future.done(ioe);
-               return future;
-           }
-           sendQueue.addLast(first);
-           sendQueue.addLast(second);
-           sendQueue.addLast(future);
-           mux.muxLock.notifyAll();
-           return future;
-       }
-       /*
-        * REMIND: Can/should we implement any sort of
-        * priority inversion avoidance scheme here?
-        */
-    }
-
-    private class Writer implements Runnable {
-       Writer() { }
-
-        @Override
-       public void run() {
-           Deque localQueue = null;
-           try {
-               while (true) {
-                   synchronized (mux.muxLock) {
-                       while (!mux.muxDown && sendQueue.isEmpty()) {
-                           /*
-                            * REMIND: Should we use a timeout here, to send
-                            * occasional PING messages during periods of
-                            * inactivity, to make sure connection is alive?
-                            */
-                           mux.muxLock.wait();
-                           /*
-                            * Let an interrupt during the wait just kill this
-                            * thread, because an interrupt during an I/O write
-                            * would leave it in an unrecoverable state anyway.
-                            */
-                       }
-                       if (mux.muxDown && sendQueue.isEmpty()) {
-                           logger.log(Level.FINEST,
-                                      "mux writer thread dying, connection " +
-                                      "down and nothing more to send");
-                           break;
-                       }
-                        /* Clone an unshared copy and clear the queue while 
synchronized */
-                       localQueue = new LinkedList(sendQueue);
-                       sendQueue.clear();
-                   }
-
-                   boolean needToFlush = false;
-                    ByteBuffer last = null;
-                    int lastIndex = Integer.MIN_VALUE;
-                   for  ( int i = 0; !localQueue.isEmpty(); i++) {
-                       Object next = localQueue.getFirst();
-                       if (next instanceof ByteBuffer) {
-                            ByteBuffer buffer = (ByteBuffer) next;
-                           outChannel.write((buffer));
-                            last = buffer;
-                            lastIndex = i;
-                           needToFlush = true;
-                       } else {
-                           assert next instanceof IOFuture;
-                           if (needToFlush) {
-                               out.flush();
-                               needToFlush = false;
-                           }
-                            if (lastIndex == i - 1 && last != null){
-                                ((IOFuture) next).done(last.position());
-                            } else {
-                                ((IOFuture) next).done();
-                            }
-                       }
-                       localQueue.removeFirst();
-                   }
-                   if (needToFlush) {
-                       out.flush();
-                   }
-               }
-           } catch (InterruptedException e) {
-               try {
-                   logger.log(Level.WARNING,
-                              "mux writer thread dying, interrupted", e);
-               } catch (Throwable t) {
-               }
-               mux.setDown("mux writer thread interrupted", e);
-           } catch (IOException e) {
-               try {
-                   logger.log(Levels.HANDLED,
-                              "mux writer thread dying, I/O error", e);
-               } catch (Throwable t) {
-               }
-               mux.setDown("I/O error writing to mux connection: " +
-                           e.toString(), e);
-           } catch (Throwable t) {
-               try {
-                   logger.log(Level.WARNING,
-                       "mux writer thread dying, unexpected exception", t);
-               } catch (Throwable tt) {
-               }
-               mux.setDown("unexpected exception in mux writer thread: " +
-                           t.toString(), t);
-           } finally {
-               synchronized (mux.muxLock) {
-                   assert mux.muxDown;
-                   if (localQueue != null) {
-                       drainQueue(localQueue);
-                   }
-                   drainQueue(sendQueue);
-               }
-               try {
-                   outChannel.close();
-               } catch (IOException e) {
-               }
-           }
-       }
-    }
-
-    private void drainQueue(Deque queue) {
-       while (!queue.isEmpty()) {
-           Object next = queue.removeFirst();
-           if (next instanceof IOFuture) {
-               IOException ioe = new IOException(mux.muxDownMessage);
-               ioe.initCause(mux.muxDownCause);
-               ((IOFuture) next).done(ioe);
-           }
-       }
-    }
-
-    private class Reader implements Runnable {
-        /** buffer for reading incoming data from connection */
-        private final ByteBuffer inputBuffer =
-            ByteBuffer.allocate(RECEIVE_BUFFER_SIZE);  // ready for reading
-
-       Reader() { }
-
-       public void run() {
-           try {
-               while (true) {
-                   int n = inChannel.read(inputBuffer);
-                   if (n == -1) {
-                       throw new EOFException();
-                   }
-                   assert n > 0;       // channel is assumed to be blocking
-                   mux.processIncomingData(inputBuffer);
-                   assert inputBuffer.hasRemaining();
-               }
-           } catch (ProtocolException e) {
-               IOFuture future = null;
-               synchronized (mux.muxLock) {
-                   /*
-                    * If mux connection is already down, then we probably got
-                    * here because of the receipt of a normal protocol-ending
-                    * message, like Shutdown or Error, or else something else
-                    * went wrong anyway.  Otherwise, a real protocol violation
-                    * was detected, so respond with an Error message before
-                    * taking down the whole mux connection.
-                    */
-                   if (!mux.muxDown) {
-                       try {
-                           logger.log(Levels.HANDLED,
-                               "mux reader thread dying, protocol error", e);
-                       } catch (Throwable t) {
-                       }
-                       future = mux.futureSendError(e.getMessage());
-                       mux.setDown("protocol violation detected: " +   
-                                   e.getMessage(), null);
-                   } else {
-                       try {
-                           logger.log(Level.FINEST,
-                               "mux reader thread dying: " + e.getMessage());
-                       } catch (Throwable t) {
-                       }
-                   }
-               }
-               if (future != null) {
-                   try {
-                       future.waitUntilDone();
-                   } catch (IOException ignore) {
-                   } catch (InterruptedException interrupt) {
-                       Thread.currentThread().interrupt();
-                   }
-               }
-           } catch (IOException e) {
-               try {
-                   logger.log(Levels.HANDLED,
-                              "mux reader thread dying, I/O error", e);
-               } catch (Throwable t) {
-               }
-               mux.setDown("I/O error reading from mux connection: " +
-                           e.toString(), e);
-           } catch (Throwable t) {
-               try {
-                   logger.log(Level.WARNING,
-                       "mux reader thread dying, unexpected exception", t);
-               } catch (Throwable tt) {
-               }
-               mux.setDown("unexpected exception in mux reader thread: " +
-                           t.toString(), t);
-           } finally {
-               try {
-                   inChannel.close();
-               } catch (IOException e) {
-               }
-           }
-       }
-    }
-
-    /**
-     * The following two methods are modifications of their
-     * equivalents in java.nio.channels.Channels with the assumption
-     * that the supplied byte buffers are backed by arrays, so no
-     * additional copying is required.
-     */
-
-    public static ReadableByteChannel newChannel(final InputStream in) {
-       return new ReadableByteChannel() {
-           private volatile boolean open = true;
-
-            // must be synchronized as per ReadableByteChannel contract
-            @Override
-           public synchronized int read(ByteBuffer dst) throws IOException {
-               assert dst.hasArray();
-               byte[] array = dst.array();
-               int arrayOffset = dst.arrayOffset();
-
-               int totalRead = 0;
-               int bytesRead = 0;
-               int bytesToRead;
-               while ((bytesToRead = dst.remaining()) > 0) {
-                   if ((totalRead > 0) && !(in.available() > 0)) {
-                       break; // block at most once
-                   }
-                   int pos = dst.position();
-                   bytesRead = in.read(array, arrayOffset + pos, bytesToRead);
-                   if (bytesRead < 0) {
-                       break;
-                   } else {
-                       dst.position(pos + bytesRead);
-                       totalRead += bytesRead;
-                   }
-               }
-               if ((bytesRead < 0) && (totalRead == 0)) {
-                   return -1;
-               }
-
-               return totalRead;
-           }
-                
-            @Override
-           public boolean isOpen() {
-               return open;
-           }
-            
-            // Blocking as per Channel contract
-            @Override
-           public synchronized void close() throws IOException {
-               in.close();
-               open = false;
-           }
-       };
-    }
-
-    public static WritableByteChannel newChannel(final OutputStream out) {
-       return new WritableByteChannel() {
-           private volatile boolean open = true;
-            
-            // This method must block while writing as per WritableByteChannel 
contract.
-            @Override
-           public synchronized int write(ByteBuffer src) throws IOException {
-                    assert src.hasArray();
-
-                    int len = src.remaining();
-                    if (len > 0) {
-                        int pos = src.position();
-                        out.write(src.array(), src.arrayOffset() + pos, len);
-                        src.position(pos + len);
-                    }
-                    return len;
-                }
-                
-            @Override
-           public boolean isOpen() {
-               return open;
-           }
-
-            // This method must block as per the Channel contract
-            @Override
-           public synchronized void close() throws IOException {
-               out.close();
-               open = false;
-           }
-       };
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.jeri.internal.mux;
+
+import org.apache.river.logging.Levels;
+import org.apache.river.thread.Executor;
+import org.apache.river.thread.GetThreadPoolAction;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.security.AccessController;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * StreamConnectionIO implements the ConnectionIO abstraction for a
+ * connection accessible through standard (blocking) I/O streams, i.e.
+ * java.io.OutputStream and java.io.InputStream.
+ *
+ * @author Sun Microsystems, Inc.
+ **/
+final class StreamConnectionIO extends ConnectionIO {
+
+    private static final int RECEIVE_BUFFER_SIZE = 2048;
+
+    /**
+     * pool of threads for executing tasks in system thread group:
+     * used for I/O (reader and writer) threads
+     */
+    private static final Executor systemThreadPool =
+       (Executor) AccessController.doPrivileged(
+           new GetThreadPoolAction(false));
+
+    /** mux logger */
+    private static final Logger logger =
+       Logger.getLogger("net.jini.jeri.connection.mux");
+
+    /** I/O streams for underlying connection */
+    private final OutputStream out;
+    private final InputStream in;
+
+    /** channels wrapped around underlying I/O streams */
+    private final WritableByteChannel outChannel;
+    private final ReadableByteChannel inChannel;
+
+    /**
+     * queue of buffers of data to be sent over connection, interspersed
+     * with IOFuture objects that need to be notified in sequence
+     * 
+     * Synchronised on super.mux.muxLock;
+     */
+    private final Deque sendQueue;
+
+    
+    /**
+     * Creates a new StreamConnectionIO for the connection represented by
+     * the supplied OutputStream and InputStream pair.
+     */
+    StreamConnectionIO(Mux mux, OutputStream out, InputStream in) {
+       super(mux);
+       this.out = out;
+//     this.out = new BufferedOutputStream(out);
+       this.in = in;
+
+       outChannel = newChannel(out);
+       inChannel = newChannel(in);
+        sendQueue = new LinkedList();
+    }
+
+    /**
+     * Starts processing connection data.  This method starts
+     * asynchronous actions to read and write from the connection.
+     */
+    @Override
+    void start() throws IOException {
+       try {
+           systemThreadPool.execute(new Writer(), "mux writer");
+           systemThreadPool.execute(new Reader(), "mux reader");
+       } catch (OutOfMemoryError e) {  // assume out of threads
+           try {
+               logger.log(Level.WARNING,
+                          "could not create thread for request dispatch", e);
+           } catch (Throwable t) {
+           }
+           throw new IOException("could not create I/O threads", e);
+       }
+    }
+
+    @Override
+    void asyncSend(ByteBuffer buffer) {
+       synchronized (mux.muxLock) {
+           if (mux.muxDown) {
+               return;
+           }
+           sendQueue.addLast(buffer);
+           mux.muxLock.notifyAll();
+       }
+    }
+
+    @Override
+    void asyncSend(ByteBuffer first, ByteBuffer second) {
+       synchronized (mux.muxLock) {
+           if (mux.muxDown) {
+               return;
+           }
+           sendQueue.addLast(first);
+           sendQueue.addLast(second);
+           mux.muxLock.notifyAll();
+       }
+    }
+
+    @Override
+    IOFuture futureSend(ByteBuffer first, ByteBuffer second) {
+       synchronized (mux.muxLock) {
+           IOFuture future = new IOFuture();
+           if (mux.muxDown) {
+               IOException ioe = new IOException(mux.muxDownMessage);
+               ioe.initCause(mux.muxDownCause);
+               future.done(ioe);
+               return future;
+           }
+           sendQueue.addLast(first);
+           sendQueue.addLast(second);
+           sendQueue.addLast(future);
+           mux.muxLock.notifyAll();
+           return future;
+       }
+       /*
+        * REMIND: Can/should we implement any sort of
+        * priority inversion avoidance scheme here?
+        */
+    }
+
+    private class Writer implements Runnable {
+       Writer() { }
+
+        @Override
+       public void run() {
+           Deque localQueue = null;
+           try {
+               while (true) {
+                   synchronized (mux.muxLock) {
+                       while (!mux.muxDown && sendQueue.isEmpty()) {
+                           /*
+                            * REMIND: Should we use a timeout here, to send
+                            * occasional PING messages during periods of
+                            * inactivity, to make sure connection is alive?
+                            */
+                           mux.muxLock.wait();
+                           /*
+                            * Let an interrupt during the wait just kill this
+                            * thread, because an interrupt during an I/O write
+                            * would leave it in an unrecoverable state anyway.
+                            */
+                       }
+                       if (mux.muxDown && sendQueue.isEmpty()) {
+                           logger.log(Level.FINEST,
+                                      "mux writer thread dying, connection " +
+                                      "down and nothing more to send");
+                           break;
+                       }
+                        /* Clone an unshared copy and clear the queue while 
synchronized */
+                       localQueue = new LinkedList(sendQueue);
+                       sendQueue.clear();
+                   }
+
+                   boolean needToFlush = false;
+                    ByteBuffer last = null;
+                    int lastIndex = Integer.MIN_VALUE;
+                   for  ( int i = 0; !localQueue.isEmpty(); i++) {
+                       Object next = localQueue.getFirst();
+                       if (next instanceof ByteBuffer) {
+                            ByteBuffer buffer = (ByteBuffer) next;
+                           outChannel.write((buffer));
+                            last = buffer;
+                            lastIndex = i;
+                           needToFlush = true;
+                       } else {
+                           assert next instanceof IOFuture;
+                           if (needToFlush) {
+                               out.flush();
+                               needToFlush = false;
+                           }
+                            if (lastIndex == i - 1 && last != null){
+                                ((IOFuture) next).done(last.position());
+                            } else {
+                                ((IOFuture) next).done();
+                            }
+                       }
+                       localQueue.removeFirst();
+                   }
+                   if (needToFlush) {
+                       out.flush();
+                   }
+               }
+           } catch (InterruptedException e) {
+               try {
+                   logger.log(Level.WARNING,
+                              "mux writer thread dying, interrupted", e);
+               } catch (Throwable t) {
+               }
+               mux.setDown("mux writer thread interrupted", e);
+           } catch (IOException e) {
+               try {
+                   logger.log(Levels.HANDLED,
+                              "mux writer thread dying, I/O error", e);
+               } catch (Throwable t) {
+               }
+               mux.setDown("I/O error writing to mux connection: " +
+                           e.toString(), e);
+           } catch (Throwable t) {
+               try {
+                   logger.log(Level.WARNING,
+                       "mux writer thread dying, unexpected exception", t);
+               } catch (Throwable tt) {
+               }
+               mux.setDown("unexpected exception in mux writer thread: " +
+                           t.toString(), t);
+           } finally {
+               synchronized (mux.muxLock) {
+                   assert mux.muxDown;
+                   if (localQueue != null) {
+                       drainQueue(localQueue);
+                   }
+                   drainQueue(sendQueue);
+               }
+               try {
+                   outChannel.close();
+               } catch (IOException e) {
+               }
+           }
+       }
+    }
+
+    private void drainQueue(Deque queue) {
+       while (!queue.isEmpty()) {
+           Object next = queue.removeFirst();
+           if (next instanceof IOFuture) {
+               IOException ioe = new IOException(mux.muxDownMessage);
+               ioe.initCause(mux.muxDownCause);
+               ((IOFuture) next).done(ioe);
+           }
+       }
+    }
+
+    private class Reader implements Runnable {
+        /** buffer for reading incoming data from connection */
+        private final ByteBuffer inputBuffer =
+            ByteBuffer.allocate(RECEIVE_BUFFER_SIZE);  // ready for reading
+
+       Reader() { }
+
+       public void run() {
+           try {
+               while (true) {
+                   int n = inChannel.read(inputBuffer);
+                   if (n == -1) {
+                       throw new EOFException();
+                   }
+                   assert n > 0;       // channel is assumed to be blocking
+                   mux.processIncomingData(inputBuffer);
+                   assert inputBuffer.hasRemaining();
+               }
+           } catch (ProtocolException e) {
+               IOFuture future = null;
+               synchronized (mux.muxLock) {
+                   /*
+                    * If mux connection is already down, then we probably got
+                    * here because of the receipt of a normal protocol-ending
+                    * message, like Shutdown or Error, or else something else
+                    * went wrong anyway.  Otherwise, a real protocol violation
+                    * was detected, so respond with an Error message before
+                    * taking down the whole mux connection.
+                    */
+                   if (!mux.muxDown) {
+                       try {
+                           logger.log(Levels.HANDLED,
+                               "mux reader thread dying, protocol error", e);
+                       } catch (Throwable t) {
+                       }
+                       future = mux.futureSendError(e.getMessage());
+                       mux.setDown("protocol violation detected: " +   
+                                   e.getMessage(), null);
+                   } else {
+                       try {
+                           logger.log(Level.FINEST,
+                               "mux reader thread dying: " + e.getMessage());
+                       } catch (Throwable t) {
+                       }
+                   }
+               }
+               if (future != null) {
+                   try {
+                       future.waitUntilDone();
+                   } catch (IOException ignore) {
+                   } catch (InterruptedException interrupt) {
+                       Thread.currentThread().interrupt();
+                   }
+               }
+           } catch (IOException e) {
+               try {
+                   logger.log(Levels.HANDLED,
+                              "mux reader thread dying, I/O error", e);
+               } catch (Throwable t) {
+               }
+               mux.setDown("I/O error reading from mux connection: " +
+                           e.toString(), e);
+           } catch (Throwable t) {
+               try {
+                   logger.log(Level.WARNING,
+                       "mux reader thread dying, unexpected exception", t);
+               } catch (Throwable tt) {
+               }
+               mux.setDown("unexpected exception in mux reader thread: " +
+                           t.toString(), t);
+           } finally {
+               try {
+                   inChannel.close();
+               } catch (IOException e) {
+               }
+           }
+       }
+    }
+
+    /**
+     * The following two methods are modifications of their
+     * equivalents in java.nio.channels.Channels with the assumption
+     * that the supplied byte buffers are backed by arrays, so no
+     * additional copying is required.
+     */
+
+    public static ReadableByteChannel newChannel(final InputStream in) {
+       return new ReadableByteChannel() {
+           private boolean open = true;
+
+            // must be synchronized as per ReadableByteChannel contract
+            @Override
+           public synchronized int read(ByteBuffer dst) throws IOException {
+               assert dst.hasArray();
+               byte[] array = dst.array();
+               int arrayOffset = dst.arrayOffset();
+
+               int totalRead = 0;
+               int bytesRead = 0;
+               int bytesToRead;
+               while ((bytesToRead = dst.remaining()) > 0) {
+                   if ((totalRead > 0) && !(in.available() > 0)) {
+                       break; // block at most once
+                   }
+                   int pos = dst.position();
+                   bytesRead = in.read(array, arrayOffset + pos, bytesToRead);
+                   if (bytesRead < 0) {
+                       break;
+                   } else {
+                       dst.position(pos + bytesRead);
+                       totalRead += bytesRead;
+                   }
+               }
+               if ((bytesRead < 0) && (totalRead == 0)) {
+                   return -1;
+               }
+
+               return totalRead;
+           }
+                
+            @Override
+           public synchronized boolean isOpen() {
+               return open;
+           }
+            
+            // Blocking as per Channel contract
+            @Override
+           public synchronized void close() throws IOException {
+               in.close();
+               open = false;
+           }
+       };
+    }
+
+    public static WritableByteChannel newChannel(final OutputStream out) {
+       return new WritableByteChannel() {
+           private volatile boolean open = true;
+            
+            // This method must block while writing as per WritableByteChannel 
contract.
+            @Override
+           public synchronized int write(ByteBuffer src) throws IOException {
+                    assert src.hasArray();
+
+                    int len = src.remaining();
+                    if (len > 0) {
+                        int pos = src.position();
+                        out.write(src.array(), src.arrayOffset() + pos, len);
+                        src.position(pos + len);
+                    }
+                    return len;
+                }
+                
+            @Override
+           public boolean isOpen() {
+               return open;
+           }
+
+            // This method must block as per the Channel contract
+            @Override
+           public synchronized void close() throws IOException {
+               out.close();
+               open = false;
+           }
+       };
+    }
+
+}


Reply via email to