In order to properly review changes, it would be great to know what the problem 
it is that you’re fixing - could you share?

Cheers,

Greg Trasuk
> On Nov 26, 2015, at 6:56 AM, peter_firmst...@apache.org wrote:
> 
> Author: peter_firmstone
> Date: Thu Nov 26 11:56:32 2015
> New Revision: 1716613
> 
> URL: http://svn.apache.org/viewvc?rev=1716613&view=rev
> Log:
> Commit my local Jeri multiplexer stability improvements to assist with jtreg 
> multiplexer nio tests:
> 
> net.jini.jeri.tcp.outOfThreads.OutOfThreads.java
> net.jini.jeri.tcp.outOfThreads.OutOfThreads2.java
> 
> Modified:
>    
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java
>    
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java
>    
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java
>    
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java
>    
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java
> 
> Modified: 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java
> URL: 
> http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java
>  (original)
> +++ 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java
>  Thu Nov 26 11:56:32 2015
> @@ -34,9 +34,7 @@ import java.nio.charset.CharsetEncoder;
> import java.security.AccessController;
> import java.util.BitSet;
> import java.util.Deque;
> -import java.util.HashMap;
> import java.util.LinkedList;
> -import java.util.Map;
> import java.util.logging.Level;
> import java.util.logging.Logger;
> 
> @@ -107,6 +105,7 @@ abstract class Mux {
> 
>       public void run() {
>           for (int i = 0; i < sessions.length; i++) {
> +             if (sessions[i] != null)
>               sessions[i].setDown(message, cause);
>           }
>       }
> @@ -135,7 +134,7 @@ abstract class Mux {
>     Throwable muxDownCause;
> 
>     final BitSet busySessions = new BitSet();
> -    final Map<Integer,Session> sessions = new HashMap<Integer,Session>(128);
> +    final Session [] sessions = new Session[MAX_SESSION_ID + 1];
> 
>     private int expectedPingCookie = -1;
> 
> @@ -274,10 +273,15 @@ abstract class Mux {
>       assert Thread.holdsLock(muxLock);
>       assert !muxDown;
>       assert !busySessions.get(sessionID);
> -     assert sessions.get(Integer.valueOf(sessionID)) == null;
> +//   assert sessions.get(Byte.valueOf(sessionID)) == null;
> +     assert sessions[sessionID] == null;
> 
>       busySessions.set(sessionID);
> -     sessions.put(Integer.valueOf(sessionID), session);
> +//   Throwable t = new Throwable();
> +//   System.out.println("Setting sessionID: "+ sessionID);
> +//   t.printStackTrace(System.out);
> +//   sessions.put(Byte.valueOf(sessionID), session);
> +     sessions[sessionID] = session;
>     }
> 
>     /**
> @@ -285,14 +289,17 @@ abstract class Mux {
>      * This method is intended to be invoked by this class and
>      * subclasses only.
>      *
> -     * This method MAY be invoked while synchronized on muxLock.
> +     * This method MAY be invoked while synchronized on muxLock if failure
> +     * occurs during start up.
>      */
>     final void setDown(final String message, final Throwable cause) {
> +     SessionShutdownTask sst = null;
>       synchronized (muxLock) {
>           if (muxDown) return;
>           muxDown = true;
>           muxDownMessage = message;
>           muxDownCause = cause;
> +         sst = new SessionShutdownTask(sessions.clone(), message, cause);
>           muxLock.notifyAll();
>       }
> 
> @@ -309,11 +316,8 @@ abstract class Mux {
>              */
>       boolean needWorker = false;
>             synchronized (sessionShutdownQueue) {
> -                if (!sessions.isEmpty()) {
> -                    sessionShutdownQueue.add(new SessionShutdownTask(
> -                        (Session[]) sessions.values().toArray(
> -                            new Session[sessions.values().size()]),
> -                        message, cause));
> +                if (sst != null) {
> +                    sessionShutdownQueue.add(sst);
>                     needWorker = true;
>                 } else {
>                     needWorker = !sessionShutdownQueue.isEmpty();
> @@ -360,7 +364,7 @@ abstract class Mux {
>           }
>           assert busySessions.get(sessionID);
>           busySessions.clear(sessionID);
> -         sessions.remove(Integer.valueOf(sessionID));
> +         sessions[sessionID] = null;
>       }
>     }
> 
> @@ -1178,8 +1182,7 @@ abstract class Mux {
>       getSession(sessionID).handleAcknowledgment();
>     }
> 
> -    private void handleData(int sessionID, boolean open, boolean close,
> -                         boolean eof, boolean ackRequired, ByteBuffer data)
> +    private void handleData(int sessionID, boolean open, boolean close, 
> boolean eof, boolean ackRequired, ByteBuffer data)
>       throws ProtocolException
>     {
>       if (logger.isLoggable(Level.FINEST)) {
> @@ -1219,7 +1222,7 @@ abstract class Mux {
>               throw new ProtocolException(
>                   "inactive sessionID: " + sessionID);
>           }
> -         return (Session) sessions.get(Integer.valueOf(sessionID));
> +         return sessions[sessionID];
>       }
>     }
> 
> 
> Modified: 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java
> URL: 
> http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java
>  (original)
> +++ 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java
>  Thu Nov 26 11:56:32 2015
> @@ -1,129 +1,129 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - * 
> - *      http://www.apache.org/licenses/LICENSE-2.0
> - * 
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -
> -package org.apache.river.jeri.internal.mux;
> -
> -import org.apache.river.action.GetIntegerAction;
> -import java.io.IOException;
> -import java.io.InputStream;
> -import java.io.OutputStream;
> -import java.nio.channels.SocketChannel;
> -import java.security.AccessController;
> -import java.util.Collection;
> -import net.jini.jeri.OutboundRequest;
> -
> -/**
> - * A MuxClient controls the client side of multiplexed connection.
> - *
> - * @author Sun Microsystems, Inc.
> - **/
> -public class MuxClient extends Mux {
> -
> -    /** initial inbound ration as client, default is 32768 */
> -    private static final int clientInitialInboundRation =
> -     ((Integer) AccessController.doPrivileged(new GetIntegerAction(
> -         "org.apache.river.jeri.connection.mux.client.initialInboundRation",
> -         32768))).intValue();
> -
> -    /**
> -     * Initiates the client side of the multiplexed connection over
> -     * the given input/output stream pair.
> -     *
> -     * @param out the output stream of the underlying connection
> -     *
> -     * @param in the input stream of the underlying connection
> -     **/
> -    public MuxClient(OutputStream out, InputStream in) throws IOException {
> -     super(out, in, Mux.CLIENT, clientInitialInboundRation, 1024);
> -    }
> -
> -    public MuxClient(SocketChannel channel) throws IOException {
> -     super(channel, Mux.CLIENT, clientInitialInboundRation, 1024);
> -    }
> -
> -    /**
> -     * Starts a new request over this connection, returning the
> -     * corresponding OutboundRequest object.
> -     *
> -     * @return the OutboundRequest for the newly created request
> -     **/
> -    public OutboundRequest newRequest()      throws IOException {
> -     synchronized (muxLock) {
> -         if (muxDown) {
> -             IOException ioe = new IOException(muxDownMessage);
> -             ioe.initCause(muxDownCause);
> -             throw ioe;
> -         }
> -         int sessionID = busySessions.nextClearBit(0);
> -         if (sessionID > Mux.MAX_SESSION_ID) {
> -             throw new IOException("no free sessions");
> -         }
> -
> -         Session session = new Session(this, sessionID, Session.CLIENT);
> -         addSession(sessionID, session);
> -         return session.getOutboundRequest();
> -     }
> -    }
> -
> -    /**
> -     * Returns the current number of requests in progress over this
> -     * connection.
> -     *
> -     * The value is guaranteed to not increase until the next
> -     * invocation of the newRequest method.
> -     *
> -     * @return the number of requests in progress over this connection
> -     *
> -     * @throws IOException if the multiplexed connection is no longer
> -     * active
> -     **/
> -    public int requestsInProgress() throws IOException {
> -     synchronized (muxLock) {
> -         if (muxDown) {
> -             IOException ioe = new IOException(muxDownMessage);
> -             ioe.initCause(muxDownCause);
> -             throw ioe;
> -         }
> -         return busySessions.cardinality();
> -     }
> -    }
> -
> -    /**
> -     * Shuts down this multiplexed connection.  Requests in progress
> -     * will throw IOException for future I/O operations.
> -     *
> -     * @param message reason for shutdown to be included in
> -     * IOExceptions thrown from future I/O operations
> -     **/
> -    public void shutdown(String message) {
> -     synchronized (muxLock) {
> -         setDown(message, null);
> -     }
> -    }
> -
> -    /**
> -     * Populates the context collection with information representing
> -     * this connection.
> -     *
> -     * This method should be overridden by subclasses to implement the
> -     * desired behavior of the populateContext method for
> -     * OutboundRequest instances generated for this connection.
> -     **/
> -    protected void populateContext(Collection context) {
> -    }
> -}
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + * 
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.river.jeri.internal.mux;
> +
> +import org.apache.river.action.GetIntegerAction;
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.io.OutputStream;
> +import java.nio.channels.SocketChannel;
> +import java.security.AccessController;
> +import java.util.Collection;
> +import net.jini.jeri.OutboundRequest;
> +
> +/**
> + * A MuxClient controls the client side of multiplexed connection.
> + *
> + * @author Sun Microsystems, Inc.
> + **/
> +public class MuxClient extends Mux {
> +
> +    /** initial inbound ration as client, default is 32768 */
> +    private static final int clientInitialInboundRation =
> +     ((Integer) AccessController.doPrivileged(new GetIntegerAction(
> +         "org.apache.river.jeri.connection.mux.client.initialInboundRation",
> +         32768))).intValue();
> +
> +    /**
> +     * Initiates the client side of the multiplexed connection over
> +     * the given input/output stream pair.
> +     *
> +     * @param out the output stream of the underlying connection
> +     *
> +     * @param in the input stream of the underlying connection
> +     **/
> +    public MuxClient(OutputStream out, InputStream in) throws IOException {
> +     super(out, in, Mux.CLIENT, clientInitialInboundRation, 1024);
> +    }
> +
> +    public MuxClient(SocketChannel channel) throws IOException {
> +     super(channel, Mux.CLIENT, clientInitialInboundRation, 1024);
> +    }
> +
> +    /**
> +     * Starts a new request over this connection, returning the
> +     * corresponding OutboundRequest object.
> +     *
> +     * @return the OutboundRequest for the newly created request
> +     **/
> +    public OutboundRequest newRequest()      throws IOException {
> +     synchronized (muxLock) {
> +         if (muxDown) {
> +             IOException ioe = new IOException(muxDownMessage);
> +             ioe.initCause(muxDownCause);
> +             throw ioe;
> +         }
> +         byte sessionID = (byte) busySessions.nextClearBit(0);
> +         if (sessionID > Mux.MAX_SESSION_ID) {
> +             throw new IOException("no free sessions");
> +         }
> +
> +         Session session = new Session(this, sessionID, Session.CLIENT);
> +         addSession(sessionID, session);
> +         return session.getOutboundRequest();
> +     }
> +    }
> +
> +    /**
> +     * Returns the current number of requests in progress over this
> +     * connection.
> +     *
> +     * The value is guaranteed to not increase until the next
> +     * invocation of the newRequest method.
> +     *
> +     * @return the number of requests in progress over this connection
> +     *
> +     * @throws IOException if the multiplexed connection is no longer
> +     * active
> +     **/
> +    public int requestsInProgress() throws IOException {
> +     synchronized (muxLock) {
> +         if (muxDown) {
> +             IOException ioe = new IOException(muxDownMessage);
> +             ioe.initCause(muxDownCause);
> +             throw ioe;
> +         }
> +         return busySessions.cardinality();
> +     }
> +    }
> +
> +    /**
> +     * Shuts down this multiplexed connection.  Requests in progress
> +     * will throw IOException for future I/O operations.
> +     *
> +     * @param message reason for shutdown to be included in
> +     * IOExceptions thrown from future I/O operations
> +     **/
> +    public void shutdown(String message) {
> +     synchronized (muxLock) {
> +         setDown(message, null);
> +     }
> +    }
> +
> +    /**
> +     * Populates the context collection with information representing
> +     * this connection.
> +     *
> +     * This method should be overridden by subclasses to implement the
> +     * desired behavior of the populateContext method for
> +     * OutboundRequest instances generated for this connection.
> +     **/
> +    protected void populateContext(Collection context) {
> +    }
> +}
> 
> Modified: 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java
> URL: 
> http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java
>  (original)
> +++ 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java
>  Thu Nov 26 11:56:32 2015
> @@ -1,308 +1,308 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - * 
> - *      http://www.apache.org/licenses/LICENSE-2.0
> - * 
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -
> -package org.apache.river.jeri.internal.mux;
> -
> -import java.io.IOException;
> -import java.io.InputStream;
> -import java.nio.ByteBuffer;
> -import java.util.Deque;
> -import java.util.LinkedList;
> -
> -/**
> - * Output stream returned by OutboundRequests and InboundRequests for
> - * a session of a multiplexed connection.
> - */
> -class MuxInputStream extends InputStream {
> -    private final Object sessionLock;
> -    private final Session session;
> -    private final Mux mux;
> -    private final Deque<ByteBuffer> inBufQueue;
> -    private IOException sessionDown = null;
> -    private int inBufRemaining = 0;
> -    private int inBufPos = 0;
> -    private boolean inEOF = false;
> -    private boolean inClosed = false;
> -    private boolean sentAcknowledgment = false;
> -
> -    MuxInputStream(Mux mux, Session session, Object sessionLock) {
> -        this.mux = mux;
> -        this.session = session;
> -        this.sessionLock = sessionLock;
> -        this.inBufQueue = new LinkedList<ByteBuffer>();
> -    }
> -
> -    void down(IOException e) {
> -        sessionDown = e;
> -    }
> -
> -    void appendToBufQueue(ByteBuffer data) {
> -        inBufQueue.addLast(data);
> -    }
> -
> -    @Override
> -    public int read() throws IOException {
> -        synchronized (sessionLock) {
> -            if (inClosed) {
> -                throw new IOException("stream closed");
> -            }
> -            while (inBufRemaining == 0 && sessionDown == null && 
> session.getInState() <= Session.OPEN && !inClosed) {
> -                if (session.getInState() == Session.IDLE) {
> -                    assert session.getOutState() == Session.IDLE;
> -                    mux.asyncSendData(Mux.Data | Mux.Data_open, 
> session.sessionID, null);
> -                    session.setOutState(Session.OPEN);
> -                    session.setInState(Session.OPEN);
> -                }
> -                if (!session.inRationInfinite && session.getInRation() == 0) 
> {
> -                    int inc = mux.initialInboundRation;
> -                    mux.asyncSendIncrementRation(session.sessionID, inc);
> -                    session.setInRation(session.getInRation() + inc);
> -                }
> -                try {
> -                    sessionLock.wait(); // REMIND: timeout?
> -                } catch (InterruptedException e) {
> -                    String message = "request I/O interrupted";
> -                    session.setDown(message, e);
> -                    throw wrap(message, e);
> -                }
> -            }
> -            if (inClosed) {
> -                throw new IOException("stream closed");
> -            }
> -            if (inBufRemaining == 0) {
> -                if (inEOF) {
> -                    return -1;
> -                } else {
> -                    if (session.getInState() == Session.TERMINATED) {
> -                        throw new IOException("request aborted by remote 
> endpoint");
> -                    }
> -                    assert sessionDown != null;
> -                    throw sessionDown;
> -                }
> -            }
> -            assert inBufQueue.size() > 0;
> -            int result = -1;
> -            while (result == -1) {
> -                ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
> -                if (inBufPos < buf.limit()) {
> -                    result = (buf.get() & 0xFF);
> -                    inBufPos++;
> -                    inBufRemaining--;
> -                }
> -                if (inBufPos == buf.limit()) {
> -                    inBufQueue.removeFirst();
> -                    inBufPos = 0;
> -                }
> -            }
> -            if (!session.inRationInfinite) {
> -                checkInboundRation();
> -            }
> -            return result;
> -        }
> -    }
> -
> -    private IOException wrap(String message, Exception e) {
> -        Throwable t;
> -        if (Session.traceSupression()) {
> -            t = e;
> -        } else {
> -            t = e.fillInStackTrace();
> -        }
> -        return new IOException(message, t);
> -    }
> -
> -    @Override
> -    public int read(byte[] b, int off, int len) throws IOException {
> -        if (b == null) {
> -            throw new NullPointerException();
> -        } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + 
> len) > b.length) || ((off + len) < 0)) {
> -            throw new IndexOutOfBoundsException();
> -        }
> -        synchronized (sessionLock) {
> -            if (inClosed) {
> -                throw new IOException("stream closed");
> -            } else if (len == 0) {
> -                /*
> -                 * REMIND: What if
> -                 *     - stream is at EOF?
> -                 *     - session was aborted?
> -                 */
> -                return 0;
> -            }
> -            while (inBufRemaining == 0 && sessionDown == null && 
> session.getInState() <= Session.OPEN && !inClosed) {
> -                if (session.getInState() == Session.IDLE) {
> -                    assert session.getOutState() == Session.IDLE;
> -                    mux.asyncSendData(Mux.Data | Mux.Data_open, 
> session.sessionID, null);
> -                    session.setOutState(Session.OPEN);
> -                    session.setInState(Session.OPEN);
> -                }
> -                if (!session.inRationInfinite && session.getInRation() == 0) 
> {
> -                    int inc = mux.initialInboundRation;
> -                    mux.asyncSendIncrementRation(session.sessionID, inc);
> -                    session.setInRation(session.getInRation() + inc);
> -                }
> -                try {
> -                    sessionLock.wait(); // REMIND: timeout?
> -                } catch (InterruptedException e) {
> -                    String message = "request I/O interrupted";
> -                    session.setDown(message, e);
> -                    throw wrap(message, e);
> -                }
> -            }
> -            if (inClosed) {
> -                throw new IOException("stream closed");
> -            }
> -            if (inBufRemaining == 0) {
> -                if (inEOF) {
> -                    return -1;
> -                } else {
> -                    if (session.getInState() == Session.TERMINATED) {
> -                        throw new IOException("request aborted by remote 
> endpoint");
> -                    }
> -                    assert sessionDown != null;
> -                    throw sessionDown;
> -                }
> -            }
> -            assert inBufQueue.size() > 0;
> -            int remaining = len;
> -            while (remaining > 0 && inBufRemaining > 0) {
> -                ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
> -                if (inBufPos < buf.limit()) {
> -                    int toCopy = Math.min(buf.limit() - inBufPos, remaining);
> -                    buf.get(b, off, toCopy);
> -                    inBufPos += toCopy;
> -                    inBufRemaining -= toCopy;
> -                    off += toCopy;
> -                    remaining -= toCopy;
> -                }
> -                if (inBufPos == buf.limit()) {
> -                    inBufQueue.removeFirst();
> -                    inBufPos = 0;
> -                }
> -            }
> -            if (!session.inRationInfinite) {
> -                checkInboundRation();
> -            }
> -            return len - remaining;
> -        }
> -    }
> -
> -    /**
> -     * Sends ration increment, if read drained buffers below
> -     * a certain mark.
> -     *
> -     * This method must NOT be invoked if the inbound ration in
> -     * infinite, and it must ONLY be invoked while synchronized on
> -     * this session's lock.
> -     *
> -     * REMIND: The implementation of this action will be a
> -     * significant area for performance tuning.
> -     */
> -    private void checkInboundRation() {
> -        assert Thread.holdsLock(sessionLock);
> -        assert !session.inRationInfinite;
> -        if (session.getInState() >= Session.FINISHED) {
> -            return;
> -        }
> -        int mark = mux.initialInboundRation / 2;
> -        int used = inBufRemaining + session.getInRation();
> -        if (used <= mark) {
> -            int inc = mux.initialInboundRation - used;
> -            mux.asyncSendIncrementRation(session.sessionID, inc);
> -            session.setInRation(session.getInRation() + inc);
> -        }
> -    }
> -
> -    @Override
> -    public int available() throws IOException {
> -        synchronized (sessionLock) {
> -            if (inClosed) {
> -                throw new IOException("stream closed");
> -            }
> -            /*
> -             * REMIND: What if
> -             *     - stream is at EOF?
> -             *     - session was aborted?
> -             */
> -            return inBufRemaining;
> -        }
> -    }
> -
> -    @Override
> -    public void close() {
> -        synchronized (sessionLock) {
> -            if (inClosed) {
> -                return;
> -            }
> -            inClosed = true;
> -            inBufQueue.clear(); // allow GC of unread data
> -            if (session.role == Session.CLIENT && !sentAcknowledgment && 
> session.isReceivedAckRequired() && session.getOutState() < 
> Session.TERMINATED) {
> -                mux.asyncSendAcknowledgment(session.sessionID);
> -                sentAcknowledgment = true;
> -                /*
> -                 * If removing this session from the connection's
> -                 * table was delayed in order to be able to send
> -                 * an Acknowledgment message, then take care of
> -                 * removing it now.
> -                 */
> -                if (session.isRemoveLater()) {
> -                    session.setOutState(Session.TERMINATED);
> -                    mux.removeSession(session.sessionID);
> -                    session.setRemoveLater(false);
> -                }
> -            }
> -            sessionLock.notifyAll();
> -        }
> -    }
> -
> -    /**
> -     * @return the sentAcknowledgment
> -     */
> -    boolean isSentAcknowledgment() {
> -        return sentAcknowledgment;
> -    }
> -
> -    /**
> -     * @return the inBufRemaining
> -     */
> -    int getBufRemaining() {
> -        return inBufRemaining;
> -    }
> -
> -    /**
> -     * @return the inClosed
> -     */
> -    boolean isClosed() {
> -        return inClosed;
> -    }
> -
> -    /**
> -     * @param inBufRemaining the inBufRemaining to set
> -     */
> -    void setBufRemaining(int inBufRemaining) {
> -        this.inBufRemaining = inBufRemaining;
> -    }
> -
> -    /**
> -     * @param inEOF the inEOF to set
> -     */
> -    void setEOF(boolean inEOF) {
> -        this.inEOF = inEOF;
> -    }
> -    
> -}
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + * 
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.river.jeri.internal.mux;
> +
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.nio.ByteBuffer;
> +import java.util.Deque;
> +import java.util.LinkedList;
> +
> +/**
> + * Output stream returned by OutboundRequests and InboundRequests for
> + * a session of a multiplexed connection.
> + */
> +class MuxInputStream extends InputStream {
> +    private final Object sessionLock;
> +    private final Session session;
> +    private final Mux mux;
> +    private final Deque<ByteBuffer> inBufQueue;
> +    private IOException sessionDown = null;
> +    private int inBufRemaining = 0;
> +    private int inBufPos = 0;
> +    private boolean inEOF = false;
> +    private boolean inClosed = false;
> +    private boolean sentAcknowledgment = false;
> +
> +    MuxInputStream(Mux mux, Session session, Object sessionLock) {
> +        this.mux = mux;
> +        this.session = session;
> +        this.sessionLock = sessionLock;
> +        this.inBufQueue = new LinkedList<ByteBuffer>();
> +    }
> +
> +    void down(IOException e) {
> +        sessionDown = e;
> +    }
> +
> +    void appendToBufQueue(ByteBuffer data) {
> +        inBufQueue.addLast(data);
> +    }
> +
> +    @Override
> +    public int read() throws IOException {
> +        synchronized (sessionLock) {
> +            if (inClosed) {
> +                throw new IOException("stream closed");
> +            }
> +            while (inBufRemaining == 0 && sessionDown == null && 
> session.getInState() <= Session.OPEN && !inClosed) {
> +                if (session.getInState() == Session.IDLE) {
> +                    assert session.getOutState() == Session.IDLE;
> +                    mux.asyncSendData(Mux.Data | Mux.Data_open, 
> session.sessionID, null);
> +                    session.setOutState(Session.OPEN);
> +                    session.setInState(Session.OPEN);
> +                }
> +                if (!session.inRationInfinite && session.getInRation() == 0) 
> {
> +                    int inc = mux.initialInboundRation;
> +                    mux.asyncSendIncrementRation(session.sessionID, inc);
> +                    session.setInRation(session.getInRation() + inc);
> +                }
> +                try {
> +                    sessionLock.wait(5000L); // REMIND: timeout?
> +                } catch (InterruptedException e) {
> +                    String message = "request I/O interrupted";
> +                    session.setDown(message, e);
> +                    throw wrap(message, e);
> +                }
> +            }
> +            if (inClosed) {
> +                throw new IOException("stream closed");
> +            }
> +            if (inBufRemaining == 0) {
> +                if (inEOF) {
> +                    return -1;
> +                } else {
> +                    if (session.getInState() == Session.TERMINATED) {
> +                        throw new IOException("request aborted by remote 
> endpoint");
> +                    }
> +                    assert sessionDown != null;
> +                    throw sessionDown;
> +                }
> +            }
> +            assert inBufQueue.size() > 0;
> +            int result = -1;
> +            while (result == -1) {
> +                ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
> +                if (inBufPos < buf.limit()) {
> +                    result = (buf.get() & 0xFF);
> +                    inBufPos++;
> +                    inBufRemaining--;
> +                }
> +                if (inBufPos == buf.limit()) {
> +                    inBufQueue.removeFirst();
> +                    inBufPos = 0;
> +                }
> +            }
> +            if (!session.inRationInfinite) {
> +                checkInboundRation();
> +            }
> +            return result;
> +        }
> +    }
> +
> +    private IOException wrap(String message, Exception e) {
> +        Throwable t;
> +        if (Session.traceSupression()) {
> +            t = e;
> +        } else {
> +            t = e.fillInStackTrace();
> +        }
> +        return new IOException(message, t);
> +    }
> +
> +    @Override
> +    public int read(byte[] b, int off, int len) throws IOException {
> +        if (b == null) {
> +            throw new NullPointerException();
> +        } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + 
> len) > b.length) || ((off + len) < 0)) {
> +            throw new IndexOutOfBoundsException();
> +        }
> +        synchronized (sessionLock) {
> +            if (inClosed) {
> +                throw new IOException("stream closed");
> +            } else if (len == 0) {
> +                /*
> +                 * REMIND: What if
> +                 *     - stream is at EOF?
> +                 *     - session was aborted?
> +                 */
> +                return 0;
> +            }
> +            while (inBufRemaining == 0 && sessionDown == null && 
> session.getInState() <= Session.OPEN && !inClosed) {
> +                if (session.getInState() == Session.IDLE) {
> +                    assert session.getOutState() == Session.IDLE;
> +                    mux.asyncSendData(Mux.Data | Mux.Data_open, 
> session.sessionID, null);
> +                    session.setOutState(Session.OPEN);
> +                    session.setInState(Session.OPEN);
> +                }
> +                if (!session.inRationInfinite && session.getInRation() == 0) 
> {
> +                    int inc = mux.initialInboundRation;
> +                    mux.asyncSendIncrementRation(session.sessionID, inc);
> +                    session.setInRation(session.getInRation() + inc);
> +                }
> +                try {
> +                    sessionLock.wait(5000L); // REMIND: timeout?
> +                } catch (InterruptedException e) {
> +                    String message = "request I/O interrupted";
> +                    session.setDown(message, e);
> +                    throw wrap(message, e);
> +                }
> +            }
> +            if (inClosed) {
> +                throw new IOException("stream closed");
> +            }
> +            if (inBufRemaining == 0) {
> +                if (inEOF) {
> +                    return -1;
> +                } else {
> +                    if (session.getInState() == Session.TERMINATED) {
> +                        throw new IOException("request aborted by remote 
> endpoint");
> +                    }
> +                    assert sessionDown != null;
> +                    throw sessionDown;
> +                }
> +            }
> +            assert inBufQueue.size() > 0;
> +            int remaining = len;
> +            while (remaining > 0 && inBufRemaining > 0) {
> +                ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
> +                if (inBufPos < buf.limit()) {
> +                    int toCopy = Math.min(buf.limit() - inBufPos, remaining);
> +                    buf.get(b, off, toCopy);
> +                    inBufPos += toCopy;
> +                    inBufRemaining -= toCopy;
> +                    off += toCopy;
> +                    remaining -= toCopy;
> +                }
> +                if (inBufPos == buf.limit()) {
> +                    inBufQueue.removeFirst();
> +                    inBufPos = 0;
> +                }
> +            }
> +            if (!session.inRationInfinite) {
> +                checkInboundRation();
> +            }
> +            return len - remaining;
> +        }
> +    }
> +
> +    /**
> +     * Sends ration increment, if read drained buffers below
> +     * a certain mark.
> +     *
> +     * This method must NOT be invoked if the inbound ration in
> +     * infinite, and it must ONLY be invoked while synchronized on
> +     * this session's lock.
> +     *
> +     * REMIND: The implementation of this action will be a
> +     * significant area for performance tuning.
> +     */
> +    private void checkInboundRation() {
> +        assert Thread.holdsLock(sessionLock);
> +        assert !session.inRationInfinite;
> +        if (session.getInState() >= Session.FINISHED) {
> +            return;
> +        }
> +        int mark = mux.initialInboundRation / 2;
> +        int used = inBufRemaining + session.getInRation();
> +        if (used <= mark) {
> +            int inc = mux.initialInboundRation - used;
> +            mux.asyncSendIncrementRation(session.sessionID, inc);
> +            session.setInRation(session.getInRation() + inc);
> +        }
> +    }
> +
> +    @Override
> +    public int available() throws IOException {
> +        synchronized (sessionLock) {
> +            if (inClosed) {
> +                throw new IOException("stream closed");
> +            }
> +            /*
> +             * REMIND: What if
> +             *     - stream is at EOF?
> +             *     - session was aborted?
> +             */
> +            return inBufRemaining;
> +        }
> +    }
> +
> +    @Override
> +    public void close() {
> +        synchronized (sessionLock) {
> +            if (inClosed) {
> +                return;
> +            }
> +            inClosed = true;
> +            inBufQueue.clear(); // allow GC of unread data
> +            if (session.role == Session.CLIENT && !sentAcknowledgment && 
> session.isReceivedAckRequired() && session.getOutState() < 
> Session.TERMINATED) {
> +                mux.asyncSendAcknowledgment(session.sessionID);
> +                sentAcknowledgment = true;
> +                /*
> +                 * If removing this session from the connection's
> +                 * table was delayed in order to be able to send
> +                 * an Acknowledgment message, then take care of
> +                 * removing it now.
> +                 */
> +                if (session.isRemoveLater()) {
> +                    session.setOutState(Session.TERMINATED);
> +                    mux.removeSession(session.sessionID);
> +                    session.setRemoveLater(false);
> +                }
> +            }
> +            sessionLock.notifyAll();
> +        }
> +    }
> +
> +    /**
> +     * @return the sentAcknowledgment
> +     */
> +    boolean isSentAcknowledgment() {
> +        return sentAcknowledgment;
> +    }
> +
> +    /**
> +     * @return the inBufRemaining
> +     */
> +    int getBufRemaining() {
> +        return inBufRemaining;
> +    }
> +
> +    /**
> +     * @return the inClosed
> +     */
> +    boolean isClosed() {
> +        return inClosed;
> +    }
> +
> +    /**
> +     * @param inBufRemaining the inBufRemaining to set
> +     */
> +    void setBufRemaining(int inBufRemaining) {
> +        this.inBufRemaining = inBufRemaining;
> +    }
> +
> +    /**
> +     * @param inEOF the inEOF to set
> +     */
> +    void setEOF(boolean inEOF) {
> +        this.inEOF = inEOF;
> +    }
> +    
> +}
> 
> Modified: 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java
> URL: 
> http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java
>  (original)
> +++ 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java
>  Thu Nov 26 11:56:32 2015
> @@ -210,7 +210,7 @@ public class MuxServer extends Mux {
>               dispatchNewRequest(sessionID);
>               return;
>           } else {
> -             session = (Session) sessions.get(Integer.valueOf(sessionID));
> +             session = sessions[sessionID];
>               assert session != null;
>           }
>       }
> 
> Modified: 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java
> URL: 
> http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java
>  (original)
> +++ 
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java
>  Thu Nov 26 11:56:32 2015
> @@ -1,434 +1,434 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - * 
> - *      http://www.apache.org/licenses/LICENSE-2.0
> - * 
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -
> -package org.apache.river.jeri.internal.mux;
> -
> -import org.apache.river.logging.Levels;
> -import org.apache.river.thread.Executor;
> -import org.apache.river.thread.GetThreadPoolAction;
> -import java.io.EOFException;
> -import java.io.IOException;
> -import java.io.InputStream;
> -import java.io.OutputStream;
> -import java.nio.ByteBuffer;
> -import java.nio.channels.ReadableByteChannel;
> -import java.nio.channels.WritableByteChannel;
> -import java.security.AccessController;
> -import java.util.Deque;
> -import java.util.LinkedList;
> -import java.util.logging.Level;
> -import java.util.logging.Logger;
> -
> -/**
> - * StreamConnectionIO implements the ConnectionIO abstraction for a
> - * connection accessible through standard (blocking) I/O streams, i.e.
> - * java.io.OutputStream and java.io.InputStream.
> - *
> - * @author Sun Microsystems, Inc.
> - **/
> -final class StreamConnectionIO extends ConnectionIO {
> -
> -    private static final int RECEIVE_BUFFER_SIZE = 2048;
> -
> -    /**
> -     * pool of threads for executing tasks in system thread group:
> -     * used for I/O (reader and writer) threads
> -     */
> -    private static final Executor systemThreadPool =
> -     (Executor) AccessController.doPrivileged(
> -         new GetThreadPoolAction(false));
> -
> -    /** mux logger */
> -    private static final Logger logger =
> -     Logger.getLogger("net.jini.jeri.connection.mux");
> -
> -    /** I/O streams for underlying connection */
> -    private final OutputStream out;
> -    private final InputStream in;
> -
> -    /** channels wrapped around underlying I/O streams */
> -    private final WritableByteChannel outChannel;
> -    private final ReadableByteChannel inChannel;
> -
> -    /**
> -     * queue of buffers of data to be sent over connection, interspersed
> -     * with IOFuture objects that need to be notified in sequence
> -     * 
> -     * Synchronised on super.mux.muxLock;
> -     */
> -    private final Deque sendQueue;
> -
> -    
> -    /**
> -     * Creates a new StreamConnectionIO for the connection represented by
> -     * the supplied OutputStream and InputStream pair.
> -     */
> -    StreamConnectionIO(Mux mux, OutputStream out, InputStream in) {
> -     super(mux);
> -     this.out = out;
> -//   this.out = new BufferedOutputStream(out);
> -     this.in = in;
> -
> -     outChannel = newChannel(out);
> -     inChannel = newChannel(in);
> -        sendQueue = new LinkedList();
> -    }
> -
> -    /**
> -     * Starts processing connection data.  This method starts
> -     * asynchronous actions to read and write from the connection.
> -     */
> -    @Override
> -    void start() throws IOException {
> -     try {
> -         systemThreadPool.execute(new Writer(), "mux writer");
> -         systemThreadPool.execute(new Reader(), "mux reader");
> -     } catch (OutOfMemoryError e) {  // assume out of threads
> -         try {
> -             logger.log(Level.WARNING,
> -                        "could not create thread for request dispatch", e);
> -         } catch (Throwable t) {
> -         }
> -         throw new IOException("could not create I/O threads", e);
> -     }
> -    }
> -
> -    @Override
> -    void asyncSend(ByteBuffer buffer) {
> -     synchronized (mux.muxLock) {
> -         if (mux.muxDown) {
> -             return;
> -         }
> -         sendQueue.addLast(buffer);
> -         mux.muxLock.notifyAll();
> -     }
> -    }
> -
> -    @Override
> -    void asyncSend(ByteBuffer first, ByteBuffer second) {
> -     synchronized (mux.muxLock) {
> -         if (mux.muxDown) {
> -             return;
> -         }
> -         sendQueue.addLast(first);
> -         sendQueue.addLast(second);
> -         mux.muxLock.notifyAll();
> -     }
> -    }
> -
> -    @Override
> -    IOFuture futureSend(ByteBuffer first, ByteBuffer second) {
> -     synchronized (mux.muxLock) {
> -         IOFuture future = new IOFuture();
> -         if (mux.muxDown) {
> -             IOException ioe = new IOException(mux.muxDownMessage);
> -             ioe.initCause(mux.muxDownCause);
> -             future.done(ioe);
> -             return future;
> -         }
> -         sendQueue.addLast(first);
> -         sendQueue.addLast(second);
> -         sendQueue.addLast(future);
> -         mux.muxLock.notifyAll();
> -         return future;
> -     }
> -     /*
> -      * REMIND: Can/should we implement any sort of
> -      * priority inversion avoidance scheme here?
> -      */
> -    }
> -
> -    private class Writer implements Runnable {
> -     Writer() { }
> -
> -        @Override
> -     public void run() {
> -         Deque localQueue = null;
> -         try {
> -             while (true) {
> -                 synchronized (mux.muxLock) {
> -                     while (!mux.muxDown && sendQueue.isEmpty()) {
> -                         /*
> -                          * REMIND: Should we use a timeout here, to send
> -                          * occasional PING messages during periods of
> -                          * inactivity, to make sure connection is alive?
> -                          */
> -                         mux.muxLock.wait();
> -                         /*
> -                          * Let an interrupt during the wait just kill this
> -                          * thread, because an interrupt during an I/O write
> -                          * would leave it in an unrecoverable state anyway.
> -                          */
> -                     }
> -                     if (mux.muxDown && sendQueue.isEmpty()) {
> -                         logger.log(Level.FINEST,
> -                                    "mux writer thread dying, connection " +
> -                                    "down and nothing more to send");
> -                         break;
> -                     }
> -                        /* Clone an unshared copy and clear the queue while 
> synchronized */
> -                     localQueue = new LinkedList(sendQueue);
> -                     sendQueue.clear();
> -                 }
> -
> -                 boolean needToFlush = false;
> -                    ByteBuffer last = null;
> -                    int lastIndex = Integer.MIN_VALUE;
> -                 for  ( int i = 0; !localQueue.isEmpty(); i++) {
> -                     Object next = localQueue.getFirst();
> -                     if (next instanceof ByteBuffer) {
> -                            ByteBuffer buffer = (ByteBuffer) next;
> -                         outChannel.write((buffer));
> -                            last = buffer;
> -                            lastIndex = i;
> -                         needToFlush = true;
> -                     } else {
> -                         assert next instanceof IOFuture;
> -                         if (needToFlush) {
> -                             out.flush();
> -                             needToFlush = false;
> -                         }
> -                            if (lastIndex == i - 1 && last != null){
> -                                ((IOFuture) next).done(last.position());
> -                            } else {
> -                                ((IOFuture) next).done();
> -                            }
> -                     }
> -                     localQueue.removeFirst();
> -                 }
> -                 if (needToFlush) {
> -                     out.flush();
> -                 }
> -             }
> -         } catch (InterruptedException e) {
> -             try {
> -                 logger.log(Level.WARNING,
> -                            "mux writer thread dying, interrupted", e);
> -             } catch (Throwable t) {
> -             }
> -             mux.setDown("mux writer thread interrupted", e);
> -         } catch (IOException e) {
> -             try {
> -                 logger.log(Levels.HANDLED,
> -                            "mux writer thread dying, I/O error", e);
> -             } catch (Throwable t) {
> -             }
> -             mux.setDown("I/O error writing to mux connection: " +
> -                         e.toString(), e);
> -         } catch (Throwable t) {
> -             try {
> -                 logger.log(Level.WARNING,
> -                     "mux writer thread dying, unexpected exception", t);
> -             } catch (Throwable tt) {
> -             }
> -             mux.setDown("unexpected exception in mux writer thread: " +
> -                         t.toString(), t);
> -         } finally {
> -             synchronized (mux.muxLock) {
> -                 assert mux.muxDown;
> -                 if (localQueue != null) {
> -                     drainQueue(localQueue);
> -                 }
> -                 drainQueue(sendQueue);
> -             }
> -             try {
> -                 outChannel.close();
> -             } catch (IOException e) {
> -             }
> -         }
> -     }
> -    }
> -
> -    private void drainQueue(Deque queue) {
> -     while (!queue.isEmpty()) {
> -         Object next = queue.removeFirst();
> -         if (next instanceof IOFuture) {
> -             IOException ioe = new IOException(mux.muxDownMessage);
> -             ioe.initCause(mux.muxDownCause);
> -             ((IOFuture) next).done(ioe);
> -         }
> -     }
> -    }
> -
> -    private class Reader implements Runnable {
> -        /** buffer for reading incoming data from connection */
> -        private final ByteBuffer inputBuffer =
> -            ByteBuffer.allocate(RECEIVE_BUFFER_SIZE);        // ready for 
> reading
> -
> -     Reader() { }
> -
> -     public void run() {
> -         try {
> -             while (true) {
> -                 int n = inChannel.read(inputBuffer);
> -                 if (n == -1) {
> -                     throw new EOFException();
> -                 }
> -                 assert n > 0;       // channel is assumed to be blocking
> -                 mux.processIncomingData(inputBuffer);
> -                 assert inputBuffer.hasRemaining();
> -             }
> -         } catch (ProtocolException e) {
> -             IOFuture future = null;
> -             synchronized (mux.muxLock) {
> -                 /*
> -                  * If mux connection is already down, then we probably got
> -                  * here because of the receipt of a normal protocol-ending
> -                  * message, like Shutdown or Error, or else something else
> -                  * went wrong anyway.  Otherwise, a real protocol violation
> -                  * was detected, so respond with an Error message before
> -                  * taking down the whole mux connection.
> -                  */
> -                 if (!mux.muxDown) {
> -                     try {
> -                         logger.log(Levels.HANDLED,
> -                             "mux reader thread dying, protocol error", e);
> -                     } catch (Throwable t) {
> -                     }
> -                     future = mux.futureSendError(e.getMessage());
> -                     mux.setDown("protocol violation detected: " +   
> -                                 e.getMessage(), null);
> -                 } else {
> -                     try {
> -                         logger.log(Level.FINEST,
> -                             "mux reader thread dying: " + e.getMessage());
> -                     } catch (Throwable t) {
> -                     }
> -                 }
> -             }
> -             if (future != null) {
> -                 try {
> -                     future.waitUntilDone();
> -                 } catch (IOException ignore) {
> -                 } catch (InterruptedException interrupt) {
> -                     Thread.currentThread().interrupt();
> -                 }
> -             }
> -         } catch (IOException e) {
> -             try {
> -                 logger.log(Levels.HANDLED,
> -                            "mux reader thread dying, I/O error", e);
> -             } catch (Throwable t) {
> -             }
> -             mux.setDown("I/O error reading from mux connection: " +
> -                         e.toString(), e);
> -         } catch (Throwable t) {
> -             try {
> -                 logger.log(Level.WARNING,
> -                     "mux reader thread dying, unexpected exception", t);
> -             } catch (Throwable tt) {
> -             }
> -             mux.setDown("unexpected exception in mux reader thread: " +
> -                         t.toString(), t);
> -         } finally {
> -             try {
> -                 inChannel.close();
> -             } catch (IOException e) {
> -             }
> -         }
> -     }
> -    }
> -
> -    /**
> -     * The following two methods are modifications of their
> -     * equivalents in java.nio.channels.Channels with the assumption
> -     * that the supplied byte buffers are backed by arrays, so no
> -     * additional copying is required.
> -     */
> -
> -    public static ReadableByteChannel newChannel(final InputStream in) {
> -     return new ReadableByteChannel() {
> -         private volatile boolean open = true;
> -
> -            // must be synchronized as per ReadableByteChannel contract
> -            @Override
> -         public synchronized int read(ByteBuffer dst) throws IOException {
> -             assert dst.hasArray();
> -             byte[] array = dst.array();
> -             int arrayOffset = dst.arrayOffset();
> -
> -             int totalRead = 0;
> -             int bytesRead = 0;
> -             int bytesToRead;
> -             while ((bytesToRead = dst.remaining()) > 0) {
> -                 if ((totalRead > 0) && !(in.available() > 0)) {
> -                     break; // block at most once
> -                 }
> -                 int pos = dst.position();
> -                 bytesRead = in.read(array, arrayOffset + pos, bytesToRead);
> -                 if (bytesRead < 0) {
> -                     break;
> -                 } else {
> -                     dst.position(pos + bytesRead);
> -                     totalRead += bytesRead;
> -                 }
> -             }
> -             if ((bytesRead < 0) && (totalRead == 0)) {
> -                 return -1;
> -             }
> -
> -             return totalRead;
> -         }
> -                
> -            @Override
> -         public boolean isOpen() {
> -             return open;
> -         }
> -            
> -            // Blocking as per Channel contract
> -            @Override
> -         public synchronized void close() throws IOException {
> -             in.close();
> -             open = false;
> -         }
> -     };
> -    }
> -
> -    public static WritableByteChannel newChannel(final OutputStream out) {
> -     return new WritableByteChannel() {
> -         private volatile boolean open = true;
> -            
> -            // This method must block while writing as per 
> WritableByteChannel contract.
> -            @Override
> -         public synchronized int write(ByteBuffer src) throws IOException {
> -                    assert src.hasArray();
> -
> -                    int len = src.remaining();
> -                    if (len > 0) {
> -                        int pos = src.position();
> -                        out.write(src.array(), src.arrayOffset() + pos, len);
> -                        src.position(pos + len);
> -                    }
> -                    return len;
> -                }
> -                
> -            @Override
> -         public boolean isOpen() {
> -             return open;
> -         }
> -
> -            // This method must block as per the Channel contract
> -            @Override
> -         public synchronized void close() throws IOException {
> -             out.close();
> -             open = false;
> -         }
> -     };
> -    }
> -
> -}
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + * 
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.river.jeri.internal.mux;
> +
> +import org.apache.river.logging.Levels;
> +import org.apache.river.thread.Executor;
> +import org.apache.river.thread.GetThreadPoolAction;
> +import java.io.EOFException;
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.io.OutputStream;
> +import java.nio.ByteBuffer;
> +import java.nio.channels.ReadableByteChannel;
> +import java.nio.channels.WritableByteChannel;
> +import java.security.AccessController;
> +import java.util.Deque;
> +import java.util.LinkedList;
> +import java.util.logging.Level;
> +import java.util.logging.Logger;
> +
> +/**
> + * StreamConnectionIO implements the ConnectionIO abstraction for a
> + * connection accessible through standard (blocking) I/O streams, i.e.
> + * java.io.OutputStream and java.io.InputStream.
> + *
> + * @author Sun Microsystems, Inc.
> + **/
> +final class StreamConnectionIO extends ConnectionIO {
> +
> +    private static final int RECEIVE_BUFFER_SIZE = 2048;
> +
> +    /**
> +     * pool of threads for executing tasks in system thread group:
> +     * used for I/O (reader and writer) threads
> +     */
> +    private static final Executor systemThreadPool =
> +     (Executor) AccessController.doPrivileged(
> +         new GetThreadPoolAction(false));
> +
> +    /** mux logger */
> +    private static final Logger logger =
> +     Logger.getLogger("net.jini.jeri.connection.mux");
> +
> +    /** I/O streams for underlying connection */
> +    private final OutputStream out;
> +    private final InputStream in;
> +
> +    /** channels wrapped around underlying I/O streams */
> +    private final WritableByteChannel outChannel;
> +    private final ReadableByteChannel inChannel;
> +
> +    /**
> +     * queue of buffers of data to be sent over connection, interspersed
> +     * with IOFuture objects that need to be notified in sequence
> +     * 
> +     * Synchronised on super.mux.muxLock;
> +     */
> +    private final Deque sendQueue;
> +
> +    
> +    /**
> +     * Creates a new StreamConnectionIO for the connection represented by
> +     * the supplied OutputStream and InputStream pair.
> +     */
> +    StreamConnectionIO(Mux mux, OutputStream out, InputStream in) {
> +     super(mux);
> +     this.out = out;
> +//   this.out = new BufferedOutputStream(out);
> +     this.in = in;
> +
> +     outChannel = newChannel(out);
> +     inChannel = newChannel(in);
> +        sendQueue = new LinkedList();
> +    }
> +
> +    /**
> +     * Starts processing connection data.  This method starts
> +     * asynchronous actions to read and write from the connection.
> +     */
> +    @Override
> +    void start() throws IOException {
> +     try {
> +         systemThreadPool.execute(new Writer(), "mux writer");
> +         systemThreadPool.execute(new Reader(), "mux reader");
> +     } catch (OutOfMemoryError e) {  // assume out of threads
> +         try {
> +             logger.log(Level.WARNING,
> +                        "could not create thread for request dispatch", e);
> +         } catch (Throwable t) {
> +         }
> +         throw new IOException("could not create I/O threads", e);
> +     }
> +    }
> +
> +    @Override
> +    void asyncSend(ByteBuffer buffer) {
> +     synchronized (mux.muxLock) {
> +         if (mux.muxDown) {
> +             return;
> +         }
> +         sendQueue.addLast(buffer);
> +         mux.muxLock.notifyAll();
> +     }
> +    }
> +
> +    @Override
> +    void asyncSend(ByteBuffer first, ByteBuffer second) {
> +     synchronized (mux.muxLock) {
> +         if (mux.muxDown) {
> +             return;
> +         }
> +         sendQueue.addLast(first);
> +         sendQueue.addLast(second);
> +         mux.muxLock.notifyAll();
> +     }
> +    }
> +
> +    @Override
> +    IOFuture futureSend(ByteBuffer first, ByteBuffer second) {
> +     synchronized (mux.muxLock) {
> +         IOFuture future = new IOFuture();
> +         if (mux.muxDown) {
> +             IOException ioe = new IOException(mux.muxDownMessage);
> +             ioe.initCause(mux.muxDownCause);
> +             future.done(ioe);
> +             return future;
> +         }
> +         sendQueue.addLast(first);
> +         sendQueue.addLast(second);
> +         sendQueue.addLast(future);
> +         mux.muxLock.notifyAll();
> +         return future;
> +     }
> +     /*
> +      * REMIND: Can/should we implement any sort of
> +      * priority inversion avoidance scheme here?
> +      */
> +    }
> +
> +    private class Writer implements Runnable {
> +     Writer() { }
> +
> +        @Override
> +     public void run() {
> +         Deque localQueue = null;
> +         try {
> +             while (true) {
> +                 synchronized (mux.muxLock) {
> +                     while (!mux.muxDown && sendQueue.isEmpty()) {
> +                         /*
> +                          * REMIND: Should we use a timeout here, to send
> +                          * occasional PING messages during periods of
> +                          * inactivity, to make sure connection is alive?
> +                          */
> +                         mux.muxLock.wait();
> +                         /*
> +                          * Let an interrupt during the wait just kill this
> +                          * thread, because an interrupt during an I/O write
> +                          * would leave it in an unrecoverable state anyway.
> +                          */
> +                     }
> +                     if (mux.muxDown && sendQueue.isEmpty()) {
> +                         logger.log(Level.FINEST,
> +                                    "mux writer thread dying, connection " +
> +                                    "down and nothing more to send");
> +                         break;
> +                     }
> +                        /* Clone an unshared copy and clear the queue while 
> synchronized */
> +                     localQueue = new LinkedList(sendQueue);
> +                     sendQueue.clear();
> +                 }
> +
> +                 boolean needToFlush = false;
> +                    ByteBuffer last = null;
> +                    int lastIndex = Integer.MIN_VALUE;
> +                 for  ( int i = 0; !localQueue.isEmpty(); i++) {
> +                     Object next = localQueue.getFirst();
> +                     if (next instanceof ByteBuffer) {
> +                            ByteBuffer buffer = (ByteBuffer) next;
> +                         outChannel.write((buffer));
> +                            last = buffer;
> +                            lastIndex = i;
> +                         needToFlush = true;
> +                     } else {
> +                         assert next instanceof IOFuture;
> +                         if (needToFlush) {
> +                             out.flush();
> +                             needToFlush = false;
> +                         }
> +                            if (lastIndex == i - 1 && last != null){
> +                                ((IOFuture) next).done(last.position());
> +                            } else {
> +                                ((IOFuture) next).done();
> +                            }
> +                     }
> +                     localQueue.removeFirst();
> +                 }
> +                 if (needToFlush) {
> +                     out.flush();
> +                 }
> +             }
> +         } catch (InterruptedException e) {
> +             try {
> +                 logger.log(Level.WARNING,
> +                            "mux writer thread dying, interrupted", e);
> +             } catch (Throwable t) {
> +             }
> +             mux.setDown("mux writer thread interrupted", e);
> +         } catch (IOException e) {
> +             try {
> +                 logger.log(Levels.HANDLED,
> +                            "mux writer thread dying, I/O error", e);
> +             } catch (Throwable t) {
> +             }
> +             mux.setDown("I/O error writing to mux connection: " +
> +                         e.toString(), e);
> +         } catch (Throwable t) {
> +             try {
> +                 logger.log(Level.WARNING,
> +                     "mux writer thread dying, unexpected exception", t);
> +             } catch (Throwable tt) {
> +             }
> +             mux.setDown("unexpected exception in mux writer thread: " +
> +                         t.toString(), t);
> +         } finally {
> +             synchronized (mux.muxLock) {
> +                 assert mux.muxDown;
> +                 if (localQueue != null) {
> +                     drainQueue(localQueue);
> +                 }
> +                 drainQueue(sendQueue);
> +             }
> +             try {
> +                 outChannel.close();
> +             } catch (IOException e) {
> +             }
> +         }
> +     }
> +    }
> +
> +    private void drainQueue(Deque queue) {
> +     while (!queue.isEmpty()) {
> +         Object next = queue.removeFirst();
> +         if (next instanceof IOFuture) {
> +             IOException ioe = new IOException(mux.muxDownMessage);
> +             ioe.initCause(mux.muxDownCause);
> +             ((IOFuture) next).done(ioe);
> +         }
> +     }
> +    }
> +
> +    private class Reader implements Runnable {
> +        /** buffer for reading incoming data from connection */
> +        private final ByteBuffer inputBuffer =
> +            ByteBuffer.allocate(RECEIVE_BUFFER_SIZE);        // ready for 
> reading
> +
> +     Reader() { }
> +
> +     public void run() {
> +         try {
> +             while (true) {
> +                 int n = inChannel.read(inputBuffer);
> +                 if (n == -1) {
> +                     throw new EOFException();
> +                 }
> +                 assert n > 0;       // channel is assumed to be blocking
> +                 mux.processIncomingData(inputBuffer);
> +                 assert inputBuffer.hasRemaining();
> +             }
> +         } catch (ProtocolException e) {
> +             IOFuture future = null;
> +             synchronized (mux.muxLock) {
> +                 /*
> +                  * If mux connection is already down, then we probably got
> +                  * here because of the receipt of a normal protocol-ending
> +                  * message, like Shutdown or Error, or else something else
> +                  * went wrong anyway.  Otherwise, a real protocol violation
> +                  * was detected, so respond with an Error message before
> +                  * taking down the whole mux connection.
> +                  */
> +                 if (!mux.muxDown) {
> +                     try {
> +                         logger.log(Levels.HANDLED,
> +                             "mux reader thread dying, protocol error", e);
> +                     } catch (Throwable t) {
> +                     }
> +                     future = mux.futureSendError(e.getMessage());
> +                     mux.setDown("protocol violation detected: " +   
> +                                 e.getMessage(), null);
> +                 } else {
> +                     try {
> +                         logger.log(Level.FINEST,
> +                             "mux reader thread dying: " + e.getMessage());
> +                     } catch (Throwable t) {
> +                     }
> +                 }
> +             }
> +             if (future != null) {
> +                 try {
> +                     future.waitUntilDone();
> +                 } catch (IOException ignore) {
> +                 } catch (InterruptedException interrupt) {
> +                     Thread.currentThread().interrupt();
> +                 }
> +             }
> +         } catch (IOException e) {
> +             try {
> +                 logger.log(Levels.HANDLED,
> +                            "mux reader thread dying, I/O error", e);
> +             } catch (Throwable t) {
> +             }
> +             mux.setDown("I/O error reading from mux connection: " +
> +                         e.toString(), e);
> +         } catch (Throwable t) {
> +             try {
> +                 logger.log(Level.WARNING,
> +                     "mux reader thread dying, unexpected exception", t);
> +             } catch (Throwable tt) {
> +             }
> +             mux.setDown("unexpected exception in mux reader thread: " +
> +                         t.toString(), t);
> +         } finally {
> +             try {
> +                 inChannel.close();
> +             } catch (IOException e) {
> +             }
> +         }
> +     }
> +    }
> +
> +    /**
> +     * The following two methods are modifications of their
> +     * equivalents in java.nio.channels.Channels with the assumption
> +     * that the supplied byte buffers are backed by arrays, so no
> +     * additional copying is required.
> +     */
> +
> +    public static ReadableByteChannel newChannel(final InputStream in) {
> +     return new ReadableByteChannel() {
> +         private boolean open = true;
> +
> +            // must be synchronized as per ReadableByteChannel contract
> +            @Override
> +         public synchronized int read(ByteBuffer dst) throws IOException {
> +             assert dst.hasArray();
> +             byte[] array = dst.array();
> +             int arrayOffset = dst.arrayOffset();
> +
> +             int totalRead = 0;
> +             int bytesRead = 0;
> +             int bytesToRead;
> +             while ((bytesToRead = dst.remaining()) > 0) {
> +                 if ((totalRead > 0) && !(in.available() > 0)) {
> +                     break; // block at most once
> +                 }
> +                 int pos = dst.position();
> +                 bytesRead = in.read(array, arrayOffset + pos, bytesToRead);
> +                 if (bytesRead < 0) {
> +                     break;
> +                 } else {
> +                     dst.position(pos + bytesRead);
> +                     totalRead += bytesRead;
> +                 }
> +             }
> +             if ((bytesRead < 0) && (totalRead == 0)) {
> +                 return -1;
> +             }
> +
> +             return totalRead;
> +         }
> +                
> +            @Override
> +         public synchronized boolean isOpen() {
> +             return open;
> +         }
> +            
> +            // Blocking as per Channel contract
> +            @Override
> +         public synchronized void close() throws IOException {
> +             in.close();
> +             open = false;
> +         }
> +     };
> +    }
> +
> +    public static WritableByteChannel newChannel(final OutputStream out) {
> +     return new WritableByteChannel() {
> +         private volatile boolean open = true;
> +            
> +            // This method must block while writing as per 
> WritableByteChannel contract.
> +            @Override
> +         public synchronized int write(ByteBuffer src) throws IOException {
> +                    assert src.hasArray();
> +
> +                    int len = src.remaining();
> +                    if (len > 0) {
> +                        int pos = src.position();
> +                        out.write(src.array(), src.arrayOffset() + pos, len);
> +                        src.position(pos + len);
> +                    }
> +                    return len;
> +                }
> +                
> +            @Override
> +         public boolean isOpen() {
> +             return open;
> +         }
> +
> +            // This method must block as per the Channel contract
> +            @Override
> +         public synchronized void close() throws IOException {
> +             out.close();
> +             open = false;
> +         }
> +     };
> +    }
> +
> +}
> 
> 

Reply via email to