Author: peter_firmstone Date: Thu Nov 26 11:56:32 2015 New Revision: 1716613
URL: http://svn.apache.org/viewvc?rev=1716613&view=rev Log: Commit my local Jeri multiplexer stability improvements to assist with jtreg multiplexer nio tests: net.jini.jeri.tcp.outOfThreads.OutOfThreads.java net.jini.jeri.tcp.outOfThreads.OutOfThreads2.java Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java?rev=1716613&r1=1716612&r2=1716613&view=diff ============================================================================== --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java (original) +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java Thu Nov 26 11:56:32 2015 @@ -34,9 +34,7 @@ import java.nio.charset.CharsetEncoder; import java.security.AccessController; import java.util.BitSet; import java.util.Deque; -import java.util.HashMap; import java.util.LinkedList; -import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; @@ -107,6 +105,7 @@ abstract class Mux { public void run() { for (int i = 0; i < sessions.length; i++) { + if (sessions[i] != null) sessions[i].setDown(message, cause); } } @@ -135,7 +134,7 @@ abstract class Mux { Throwable muxDownCause; final BitSet busySessions = new BitSet(); - final Map<Integer,Session> sessions = new HashMap<Integer,Session>(128); + final Session [] sessions = new Session[MAX_SESSION_ID + 1]; private int expectedPingCookie = -1; @@ -274,10 +273,15 @@ abstract class Mux { assert Thread.holdsLock(muxLock); assert !muxDown; assert !busySessions.get(sessionID); - assert sessions.get(Integer.valueOf(sessionID)) == null; +// assert sessions.get(Byte.valueOf(sessionID)) == null; + assert sessions[sessionID] == null; busySessions.set(sessionID); - sessions.put(Integer.valueOf(sessionID), session); +// Throwable t = new Throwable(); +// System.out.println("Setting sessionID: "+ sessionID); +// t.printStackTrace(System.out); +// sessions.put(Byte.valueOf(sessionID), session); + sessions[sessionID] = session; } /** @@ -285,14 +289,17 @@ abstract class Mux { * This method is intended to be invoked by this class and * subclasses only. * - * This method MAY be invoked while synchronized on muxLock. + * This method MAY be invoked while synchronized on muxLock if failure + * occurs during start up. */ final void setDown(final String message, final Throwable cause) { + SessionShutdownTask sst = null; synchronized (muxLock) { if (muxDown) return; muxDown = true; muxDownMessage = message; muxDownCause = cause; + sst = new SessionShutdownTask(sessions.clone(), message, cause); muxLock.notifyAll(); } @@ -309,11 +316,8 @@ abstract class Mux { */ boolean needWorker = false; synchronized (sessionShutdownQueue) { - if (!sessions.isEmpty()) { - sessionShutdownQueue.add(new SessionShutdownTask( - (Session[]) sessions.values().toArray( - new Session[sessions.values().size()]), - message, cause)); + if (sst != null) { + sessionShutdownQueue.add(sst); needWorker = true; } else { needWorker = !sessionShutdownQueue.isEmpty(); @@ -360,7 +364,7 @@ abstract class Mux { } assert busySessions.get(sessionID); busySessions.clear(sessionID); - sessions.remove(Integer.valueOf(sessionID)); + sessions[sessionID] = null; } } @@ -1178,8 +1182,7 @@ abstract class Mux { getSession(sessionID).handleAcknowledgment(); } - private void handleData(int sessionID, boolean open, boolean close, - boolean eof, boolean ackRequired, ByteBuffer data) + private void handleData(int sessionID, boolean open, boolean close, boolean eof, boolean ackRequired, ByteBuffer data) throws ProtocolException { if (logger.isLoggable(Level.FINEST)) { @@ -1219,7 +1222,7 @@ abstract class Mux { throw new ProtocolException( "inactive sessionID: " + sessionID); } - return (Session) sessions.get(Integer.valueOf(sessionID)); + return sessions[sessionID]; } } Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java?rev=1716613&r1=1716612&r2=1716613&view=diff ============================================================================== --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java (original) +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java Thu Nov 26 11:56:32 2015 @@ -1,129 +1,129 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.river.jeri.internal.mux; - -import org.apache.river.action.GetIntegerAction; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.channels.SocketChannel; -import java.security.AccessController; -import java.util.Collection; -import net.jini.jeri.OutboundRequest; - -/** - * A MuxClient controls the client side of multiplexed connection. - * - * @author Sun Microsystems, Inc. - **/ -public class MuxClient extends Mux { - - /** initial inbound ration as client, default is 32768 */ - private static final int clientInitialInboundRation = - ((Integer) AccessController.doPrivileged(new GetIntegerAction( - "org.apache.river.jeri.connection.mux.client.initialInboundRation", - 32768))).intValue(); - - /** - * Initiates the client side of the multiplexed connection over - * the given input/output stream pair. - * - * @param out the output stream of the underlying connection - * - * @param in the input stream of the underlying connection - **/ - public MuxClient(OutputStream out, InputStream in) throws IOException { - super(out, in, Mux.CLIENT, clientInitialInboundRation, 1024); - } - - public MuxClient(SocketChannel channel) throws IOException { - super(channel, Mux.CLIENT, clientInitialInboundRation, 1024); - } - - /** - * Starts a new request over this connection, returning the - * corresponding OutboundRequest object. - * - * @return the OutboundRequest for the newly created request - **/ - public OutboundRequest newRequest() throws IOException { - synchronized (muxLock) { - if (muxDown) { - IOException ioe = new IOException(muxDownMessage); - ioe.initCause(muxDownCause); - throw ioe; - } - int sessionID = busySessions.nextClearBit(0); - if (sessionID > Mux.MAX_SESSION_ID) { - throw new IOException("no free sessions"); - } - - Session session = new Session(this, sessionID, Session.CLIENT); - addSession(sessionID, session); - return session.getOutboundRequest(); - } - } - - /** - * Returns the current number of requests in progress over this - * connection. - * - * The value is guaranteed to not increase until the next - * invocation of the newRequest method. - * - * @return the number of requests in progress over this connection - * - * @throws IOException if the multiplexed connection is no longer - * active - **/ - public int requestsInProgress() throws IOException { - synchronized (muxLock) { - if (muxDown) { - IOException ioe = new IOException(muxDownMessage); - ioe.initCause(muxDownCause); - throw ioe; - } - return busySessions.cardinality(); - } - } - - /** - * Shuts down this multiplexed connection. Requests in progress - * will throw IOException for future I/O operations. - * - * @param message reason for shutdown to be included in - * IOExceptions thrown from future I/O operations - **/ - public void shutdown(String message) { - synchronized (muxLock) { - setDown(message, null); - } - } - - /** - * Populates the context collection with information representing - * this connection. - * - * This method should be overridden by subclasses to implement the - * desired behavior of the populateContext method for - * OutboundRequest instances generated for this connection. - **/ - protected void populateContext(Collection context) { - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.river.jeri.internal.mux; + +import org.apache.river.action.GetIntegerAction; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.SocketChannel; +import java.security.AccessController; +import java.util.Collection; +import net.jini.jeri.OutboundRequest; + +/** + * A MuxClient controls the client side of multiplexed connection. + * + * @author Sun Microsystems, Inc. + **/ +public class MuxClient extends Mux { + + /** initial inbound ration as client, default is 32768 */ + private static final int clientInitialInboundRation = + ((Integer) AccessController.doPrivileged(new GetIntegerAction( + "org.apache.river.jeri.connection.mux.client.initialInboundRation", + 32768))).intValue(); + + /** + * Initiates the client side of the multiplexed connection over + * the given input/output stream pair. + * + * @param out the output stream of the underlying connection + * + * @param in the input stream of the underlying connection + **/ + public MuxClient(OutputStream out, InputStream in) throws IOException { + super(out, in, Mux.CLIENT, clientInitialInboundRation, 1024); + } + + public MuxClient(SocketChannel channel) throws IOException { + super(channel, Mux.CLIENT, clientInitialInboundRation, 1024); + } + + /** + * Starts a new request over this connection, returning the + * corresponding OutboundRequest object. + * + * @return the OutboundRequest for the newly created request + **/ + public OutboundRequest newRequest() throws IOException { + synchronized (muxLock) { + if (muxDown) { + IOException ioe = new IOException(muxDownMessage); + ioe.initCause(muxDownCause); + throw ioe; + } + byte sessionID = (byte) busySessions.nextClearBit(0); + if (sessionID > Mux.MAX_SESSION_ID) { + throw new IOException("no free sessions"); + } + + Session session = new Session(this, sessionID, Session.CLIENT); + addSession(sessionID, session); + return session.getOutboundRequest(); + } + } + + /** + * Returns the current number of requests in progress over this + * connection. + * + * The value is guaranteed to not increase until the next + * invocation of the newRequest method. + * + * @return the number of requests in progress over this connection + * + * @throws IOException if the multiplexed connection is no longer + * active + **/ + public int requestsInProgress() throws IOException { + synchronized (muxLock) { + if (muxDown) { + IOException ioe = new IOException(muxDownMessage); + ioe.initCause(muxDownCause); + throw ioe; + } + return busySessions.cardinality(); + } + } + + /** + * Shuts down this multiplexed connection. Requests in progress + * will throw IOException for future I/O operations. + * + * @param message reason for shutdown to be included in + * IOExceptions thrown from future I/O operations + **/ + public void shutdown(String message) { + synchronized (muxLock) { + setDown(message, null); + } + } + + /** + * Populates the context collection with information representing + * this connection. + * + * This method should be overridden by subclasses to implement the + * desired behavior of the populateContext method for + * OutboundRequest instances generated for this connection. + **/ + protected void populateContext(Collection context) { + } +} Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java?rev=1716613&r1=1716612&r2=1716613&view=diff ============================================================================== --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java (original) +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java Thu Nov 26 11:56:32 2015 @@ -1,308 +1,308 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.river.jeri.internal.mux; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.Deque; -import java.util.LinkedList; - -/** - * Output stream returned by OutboundRequests and InboundRequests for - * a session of a multiplexed connection. - */ -class MuxInputStream extends InputStream { - private final Object sessionLock; - private final Session session; - private final Mux mux; - private final Deque<ByteBuffer> inBufQueue; - private IOException sessionDown = null; - private int inBufRemaining = 0; - private int inBufPos = 0; - private boolean inEOF = false; - private boolean inClosed = false; - private boolean sentAcknowledgment = false; - - MuxInputStream(Mux mux, Session session, Object sessionLock) { - this.mux = mux; - this.session = session; - this.sessionLock = sessionLock; - this.inBufQueue = new LinkedList<ByteBuffer>(); - } - - void down(IOException e) { - sessionDown = e; - } - - void appendToBufQueue(ByteBuffer data) { - inBufQueue.addLast(data); - } - - @Override - public int read() throws IOException { - synchronized (sessionLock) { - if (inClosed) { - throw new IOException("stream closed"); - } - while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) { - if (session.getInState() == Session.IDLE) { - assert session.getOutState() == Session.IDLE; - mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null); - session.setOutState(Session.OPEN); - session.setInState(Session.OPEN); - } - if (!session.inRationInfinite && session.getInRation() == 0) { - int inc = mux.initialInboundRation; - mux.asyncSendIncrementRation(session.sessionID, inc); - session.setInRation(session.getInRation() + inc); - } - try { - sessionLock.wait(); // REMIND: timeout? - } catch (InterruptedException e) { - String message = "request I/O interrupted"; - session.setDown(message, e); - throw wrap(message, e); - } - } - if (inClosed) { - throw new IOException("stream closed"); - } - if (inBufRemaining == 0) { - if (inEOF) { - return -1; - } else { - if (session.getInState() == Session.TERMINATED) { - throw new IOException("request aborted by remote endpoint"); - } - assert sessionDown != null; - throw sessionDown; - } - } - assert inBufQueue.size() > 0; - int result = -1; - while (result == -1) { - ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst(); - if (inBufPos < buf.limit()) { - result = (buf.get() & 0xFF); - inBufPos++; - inBufRemaining--; - } - if (inBufPos == buf.limit()) { - inBufQueue.removeFirst(); - inBufPos = 0; - } - } - if (!session.inRationInfinite) { - checkInboundRation(); - } - return result; - } - } - - private IOException wrap(String message, Exception e) { - Throwable t; - if (Session.traceSupression()) { - t = e; - } else { - t = e.fillInStackTrace(); - } - return new IOException(message, t); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if (b == null) { - throw new NullPointerException(); - } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException(); - } - synchronized (sessionLock) { - if (inClosed) { - throw new IOException("stream closed"); - } else if (len == 0) { - /* - * REMIND: What if - * - stream is at EOF? - * - session was aborted? - */ - return 0; - } - while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) { - if (session.getInState() == Session.IDLE) { - assert session.getOutState() == Session.IDLE; - mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null); - session.setOutState(Session.OPEN); - session.setInState(Session.OPEN); - } - if (!session.inRationInfinite && session.getInRation() == 0) { - int inc = mux.initialInboundRation; - mux.asyncSendIncrementRation(session.sessionID, inc); - session.setInRation(session.getInRation() + inc); - } - try { - sessionLock.wait(); // REMIND: timeout? - } catch (InterruptedException e) { - String message = "request I/O interrupted"; - session.setDown(message, e); - throw wrap(message, e); - } - } - if (inClosed) { - throw new IOException("stream closed"); - } - if (inBufRemaining == 0) { - if (inEOF) { - return -1; - } else { - if (session.getInState() == Session.TERMINATED) { - throw new IOException("request aborted by remote endpoint"); - } - assert sessionDown != null; - throw sessionDown; - } - } - assert inBufQueue.size() > 0; - int remaining = len; - while (remaining > 0 && inBufRemaining > 0) { - ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst(); - if (inBufPos < buf.limit()) { - int toCopy = Math.min(buf.limit() - inBufPos, remaining); - buf.get(b, off, toCopy); - inBufPos += toCopy; - inBufRemaining -= toCopy; - off += toCopy; - remaining -= toCopy; - } - if (inBufPos == buf.limit()) { - inBufQueue.removeFirst(); - inBufPos = 0; - } - } - if (!session.inRationInfinite) { - checkInboundRation(); - } - return len - remaining; - } - } - - /** - * Sends ration increment, if read drained buffers below - * a certain mark. - * - * This method must NOT be invoked if the inbound ration in - * infinite, and it must ONLY be invoked while synchronized on - * this session's lock. - * - * REMIND: The implementation of this action will be a - * significant area for performance tuning. - */ - private void checkInboundRation() { - assert Thread.holdsLock(sessionLock); - assert !session.inRationInfinite; - if (session.getInState() >= Session.FINISHED) { - return; - } - int mark = mux.initialInboundRation / 2; - int used = inBufRemaining + session.getInRation(); - if (used <= mark) { - int inc = mux.initialInboundRation - used; - mux.asyncSendIncrementRation(session.sessionID, inc); - session.setInRation(session.getInRation() + inc); - } - } - - @Override - public int available() throws IOException { - synchronized (sessionLock) { - if (inClosed) { - throw new IOException("stream closed"); - } - /* - * REMIND: What if - * - stream is at EOF? - * - session was aborted? - */ - return inBufRemaining; - } - } - - @Override - public void close() { - synchronized (sessionLock) { - if (inClosed) { - return; - } - inClosed = true; - inBufQueue.clear(); // allow GC of unread data - if (session.role == Session.CLIENT && !sentAcknowledgment && session.isReceivedAckRequired() && session.getOutState() < Session.TERMINATED) { - mux.asyncSendAcknowledgment(session.sessionID); - sentAcknowledgment = true; - /* - * If removing this session from the connection's - * table was delayed in order to be able to send - * an Acknowledgment message, then take care of - * removing it now. - */ - if (session.isRemoveLater()) { - session.setOutState(Session.TERMINATED); - mux.removeSession(session.sessionID); - session.setRemoveLater(false); - } - } - sessionLock.notifyAll(); - } - } - - /** - * @return the sentAcknowledgment - */ - boolean isSentAcknowledgment() { - return sentAcknowledgment; - } - - /** - * @return the inBufRemaining - */ - int getBufRemaining() { - return inBufRemaining; - } - - /** - * @return the inClosed - */ - boolean isClosed() { - return inClosed; - } - - /** - * @param inBufRemaining the inBufRemaining to set - */ - void setBufRemaining(int inBufRemaining) { - this.inBufRemaining = inBufRemaining; - } - - /** - * @param inEOF the inEOF to set - */ - void setEOF(boolean inEOF) { - this.inEOF = inEOF; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.river.jeri.internal.mux; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Deque; +import java.util.LinkedList; + +/** + * Output stream returned by OutboundRequests and InboundRequests for + * a session of a multiplexed connection. + */ +class MuxInputStream extends InputStream { + private final Object sessionLock; + private final Session session; + private final Mux mux; + private final Deque<ByteBuffer> inBufQueue; + private IOException sessionDown = null; + private int inBufRemaining = 0; + private int inBufPos = 0; + private boolean inEOF = false; + private boolean inClosed = false; + private boolean sentAcknowledgment = false; + + MuxInputStream(Mux mux, Session session, Object sessionLock) { + this.mux = mux; + this.session = session; + this.sessionLock = sessionLock; + this.inBufQueue = new LinkedList<ByteBuffer>(); + } + + void down(IOException e) { + sessionDown = e; + } + + void appendToBufQueue(ByteBuffer data) { + inBufQueue.addLast(data); + } + + @Override + public int read() throws IOException { + synchronized (sessionLock) { + if (inClosed) { + throw new IOException("stream closed"); + } + while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) { + if (session.getInState() == Session.IDLE) { + assert session.getOutState() == Session.IDLE; + mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null); + session.setOutState(Session.OPEN); + session.setInState(Session.OPEN); + } + if (!session.inRationInfinite && session.getInRation() == 0) { + int inc = mux.initialInboundRation; + mux.asyncSendIncrementRation(session.sessionID, inc); + session.setInRation(session.getInRation() + inc); + } + try { + sessionLock.wait(5000L); // REMIND: timeout? + } catch (InterruptedException e) { + String message = "request I/O interrupted"; + session.setDown(message, e); + throw wrap(message, e); + } + } + if (inClosed) { + throw new IOException("stream closed"); + } + if (inBufRemaining == 0) { + if (inEOF) { + return -1; + } else { + if (session.getInState() == Session.TERMINATED) { + throw new IOException("request aborted by remote endpoint"); + } + assert sessionDown != null; + throw sessionDown; + } + } + assert inBufQueue.size() > 0; + int result = -1; + while (result == -1) { + ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst(); + if (inBufPos < buf.limit()) { + result = (buf.get() & 0xFF); + inBufPos++; + inBufRemaining--; + } + if (inBufPos == buf.limit()) { + inBufQueue.removeFirst(); + inBufPos = 0; + } + } + if (!session.inRationInfinite) { + checkInboundRation(); + } + return result; + } + } + + private IOException wrap(String message, Exception e) { + Throwable t; + if (Session.traceSupression()) { + t = e; + } else { + t = e.fillInStackTrace(); + } + return new IOException(message, t); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException(); + } + synchronized (sessionLock) { + if (inClosed) { + throw new IOException("stream closed"); + } else if (len == 0) { + /* + * REMIND: What if + * - stream is at EOF? + * - session was aborted? + */ + return 0; + } + while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) { + if (session.getInState() == Session.IDLE) { + assert session.getOutState() == Session.IDLE; + mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null); + session.setOutState(Session.OPEN); + session.setInState(Session.OPEN); + } + if (!session.inRationInfinite && session.getInRation() == 0) { + int inc = mux.initialInboundRation; + mux.asyncSendIncrementRation(session.sessionID, inc); + session.setInRation(session.getInRation() + inc); + } + try { + sessionLock.wait(5000L); // REMIND: timeout? + } catch (InterruptedException e) { + String message = "request I/O interrupted"; + session.setDown(message, e); + throw wrap(message, e); + } + } + if (inClosed) { + throw new IOException("stream closed"); + } + if (inBufRemaining == 0) { + if (inEOF) { + return -1; + } else { + if (session.getInState() == Session.TERMINATED) { + throw new IOException("request aborted by remote endpoint"); + } + assert sessionDown != null; + throw sessionDown; + } + } + assert inBufQueue.size() > 0; + int remaining = len; + while (remaining > 0 && inBufRemaining > 0) { + ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst(); + if (inBufPos < buf.limit()) { + int toCopy = Math.min(buf.limit() - inBufPos, remaining); + buf.get(b, off, toCopy); + inBufPos += toCopy; + inBufRemaining -= toCopy; + off += toCopy; + remaining -= toCopy; + } + if (inBufPos == buf.limit()) { + inBufQueue.removeFirst(); + inBufPos = 0; + } + } + if (!session.inRationInfinite) { + checkInboundRation(); + } + return len - remaining; + } + } + + /** + * Sends ration increment, if read drained buffers below + * a certain mark. + * + * This method must NOT be invoked if the inbound ration in + * infinite, and it must ONLY be invoked while synchronized on + * this session's lock. + * + * REMIND: The implementation of this action will be a + * significant area for performance tuning. + */ + private void checkInboundRation() { + assert Thread.holdsLock(sessionLock); + assert !session.inRationInfinite; + if (session.getInState() >= Session.FINISHED) { + return; + } + int mark = mux.initialInboundRation / 2; + int used = inBufRemaining + session.getInRation(); + if (used <= mark) { + int inc = mux.initialInboundRation - used; + mux.asyncSendIncrementRation(session.sessionID, inc); + session.setInRation(session.getInRation() + inc); + } + } + + @Override + public int available() throws IOException { + synchronized (sessionLock) { + if (inClosed) { + throw new IOException("stream closed"); + } + /* + * REMIND: What if + * - stream is at EOF? + * - session was aborted? + */ + return inBufRemaining; + } + } + + @Override + public void close() { + synchronized (sessionLock) { + if (inClosed) { + return; + } + inClosed = true; + inBufQueue.clear(); // allow GC of unread data + if (session.role == Session.CLIENT && !sentAcknowledgment && session.isReceivedAckRequired() && session.getOutState() < Session.TERMINATED) { + mux.asyncSendAcknowledgment(session.sessionID); + sentAcknowledgment = true; + /* + * If removing this session from the connection's + * table was delayed in order to be able to send + * an Acknowledgment message, then take care of + * removing it now. + */ + if (session.isRemoveLater()) { + session.setOutState(Session.TERMINATED); + mux.removeSession(session.sessionID); + session.setRemoveLater(false); + } + } + sessionLock.notifyAll(); + } + } + + /** + * @return the sentAcknowledgment + */ + boolean isSentAcknowledgment() { + return sentAcknowledgment; + } + + /** + * @return the inBufRemaining + */ + int getBufRemaining() { + return inBufRemaining; + } + + /** + * @return the inClosed + */ + boolean isClosed() { + return inClosed; + } + + /** + * @param inBufRemaining the inBufRemaining to set + */ + void setBufRemaining(int inBufRemaining) { + this.inBufRemaining = inBufRemaining; + } + + /** + * @param inEOF the inEOF to set + */ + void setEOF(boolean inEOF) { + this.inEOF = inEOF; + } + +} Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java?rev=1716613&r1=1716612&r2=1716613&view=diff ============================================================================== --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java (original) +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java Thu Nov 26 11:56:32 2015 @@ -210,7 +210,7 @@ public class MuxServer extends Mux { dispatchNewRequest(sessionID); return; } else { - session = (Session) sessions.get(Integer.valueOf(sessionID)); + session = sessions[sessionID]; assert session != null; } } Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java?rev=1716613&r1=1716612&r2=1716613&view=diff ============================================================================== --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java (original) +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java Thu Nov 26 11:56:32 2015 @@ -1,434 +1,434 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.river.jeri.internal.mux; - -import org.apache.river.logging.Levels; -import org.apache.river.thread.Executor; -import org.apache.river.thread.GetThreadPoolAction; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.security.AccessController; -import java.util.Deque; -import java.util.LinkedList; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * StreamConnectionIO implements the ConnectionIO abstraction for a - * connection accessible through standard (blocking) I/O streams, i.e. - * java.io.OutputStream and java.io.InputStream. - * - * @author Sun Microsystems, Inc. - **/ -final class StreamConnectionIO extends ConnectionIO { - - private static final int RECEIVE_BUFFER_SIZE = 2048; - - /** - * pool of threads for executing tasks in system thread group: - * used for I/O (reader and writer) threads - */ - private static final Executor systemThreadPool = - (Executor) AccessController.doPrivileged( - new GetThreadPoolAction(false)); - - /** mux logger */ - private static final Logger logger = - Logger.getLogger("net.jini.jeri.connection.mux"); - - /** I/O streams for underlying connection */ - private final OutputStream out; - private final InputStream in; - - /** channels wrapped around underlying I/O streams */ - private final WritableByteChannel outChannel; - private final ReadableByteChannel inChannel; - - /** - * queue of buffers of data to be sent over connection, interspersed - * with IOFuture objects that need to be notified in sequence - * - * Synchronised on super.mux.muxLock; - */ - private final Deque sendQueue; - - - /** - * Creates a new StreamConnectionIO for the connection represented by - * the supplied OutputStream and InputStream pair. - */ - StreamConnectionIO(Mux mux, OutputStream out, InputStream in) { - super(mux); - this.out = out; -// this.out = new BufferedOutputStream(out); - this.in = in; - - outChannel = newChannel(out); - inChannel = newChannel(in); - sendQueue = new LinkedList(); - } - - /** - * Starts processing connection data. This method starts - * asynchronous actions to read and write from the connection. - */ - @Override - void start() throws IOException { - try { - systemThreadPool.execute(new Writer(), "mux writer"); - systemThreadPool.execute(new Reader(), "mux reader"); - } catch (OutOfMemoryError e) { // assume out of threads - try { - logger.log(Level.WARNING, - "could not create thread for request dispatch", e); - } catch (Throwable t) { - } - throw new IOException("could not create I/O threads", e); - } - } - - @Override - void asyncSend(ByteBuffer buffer) { - synchronized (mux.muxLock) { - if (mux.muxDown) { - return; - } - sendQueue.addLast(buffer); - mux.muxLock.notifyAll(); - } - } - - @Override - void asyncSend(ByteBuffer first, ByteBuffer second) { - synchronized (mux.muxLock) { - if (mux.muxDown) { - return; - } - sendQueue.addLast(first); - sendQueue.addLast(second); - mux.muxLock.notifyAll(); - } - } - - @Override - IOFuture futureSend(ByteBuffer first, ByteBuffer second) { - synchronized (mux.muxLock) { - IOFuture future = new IOFuture(); - if (mux.muxDown) { - IOException ioe = new IOException(mux.muxDownMessage); - ioe.initCause(mux.muxDownCause); - future.done(ioe); - return future; - } - sendQueue.addLast(first); - sendQueue.addLast(second); - sendQueue.addLast(future); - mux.muxLock.notifyAll(); - return future; - } - /* - * REMIND: Can/should we implement any sort of - * priority inversion avoidance scheme here? - */ - } - - private class Writer implements Runnable { - Writer() { } - - @Override - public void run() { - Deque localQueue = null; - try { - while (true) { - synchronized (mux.muxLock) { - while (!mux.muxDown && sendQueue.isEmpty()) { - /* - * REMIND: Should we use a timeout here, to send - * occasional PING messages during periods of - * inactivity, to make sure connection is alive? - */ - mux.muxLock.wait(); - /* - * Let an interrupt during the wait just kill this - * thread, because an interrupt during an I/O write - * would leave it in an unrecoverable state anyway. - */ - } - if (mux.muxDown && sendQueue.isEmpty()) { - logger.log(Level.FINEST, - "mux writer thread dying, connection " + - "down and nothing more to send"); - break; - } - /* Clone an unshared copy and clear the queue while synchronized */ - localQueue = new LinkedList(sendQueue); - sendQueue.clear(); - } - - boolean needToFlush = false; - ByteBuffer last = null; - int lastIndex = Integer.MIN_VALUE; - for ( int i = 0; !localQueue.isEmpty(); i++) { - Object next = localQueue.getFirst(); - if (next instanceof ByteBuffer) { - ByteBuffer buffer = (ByteBuffer) next; - outChannel.write((buffer)); - last = buffer; - lastIndex = i; - needToFlush = true; - } else { - assert next instanceof IOFuture; - if (needToFlush) { - out.flush(); - needToFlush = false; - } - if (lastIndex == i - 1 && last != null){ - ((IOFuture) next).done(last.position()); - } else { - ((IOFuture) next).done(); - } - } - localQueue.removeFirst(); - } - if (needToFlush) { - out.flush(); - } - } - } catch (InterruptedException e) { - try { - logger.log(Level.WARNING, - "mux writer thread dying, interrupted", e); - } catch (Throwable t) { - } - mux.setDown("mux writer thread interrupted", e); - } catch (IOException e) { - try { - logger.log(Levels.HANDLED, - "mux writer thread dying, I/O error", e); - } catch (Throwable t) { - } - mux.setDown("I/O error writing to mux connection: " + - e.toString(), e); - } catch (Throwable t) { - try { - logger.log(Level.WARNING, - "mux writer thread dying, unexpected exception", t); - } catch (Throwable tt) { - } - mux.setDown("unexpected exception in mux writer thread: " + - t.toString(), t); - } finally { - synchronized (mux.muxLock) { - assert mux.muxDown; - if (localQueue != null) { - drainQueue(localQueue); - } - drainQueue(sendQueue); - } - try { - outChannel.close(); - } catch (IOException e) { - } - } - } - } - - private void drainQueue(Deque queue) { - while (!queue.isEmpty()) { - Object next = queue.removeFirst(); - if (next instanceof IOFuture) { - IOException ioe = new IOException(mux.muxDownMessage); - ioe.initCause(mux.muxDownCause); - ((IOFuture) next).done(ioe); - } - } - } - - private class Reader implements Runnable { - /** buffer for reading incoming data from connection */ - private final ByteBuffer inputBuffer = - ByteBuffer.allocate(RECEIVE_BUFFER_SIZE); // ready for reading - - Reader() { } - - public void run() { - try { - while (true) { - int n = inChannel.read(inputBuffer); - if (n == -1) { - throw new EOFException(); - } - assert n > 0; // channel is assumed to be blocking - mux.processIncomingData(inputBuffer); - assert inputBuffer.hasRemaining(); - } - } catch (ProtocolException e) { - IOFuture future = null; - synchronized (mux.muxLock) { - /* - * If mux connection is already down, then we probably got - * here because of the receipt of a normal protocol-ending - * message, like Shutdown or Error, or else something else - * went wrong anyway. Otherwise, a real protocol violation - * was detected, so respond with an Error message before - * taking down the whole mux connection. - */ - if (!mux.muxDown) { - try { - logger.log(Levels.HANDLED, - "mux reader thread dying, protocol error", e); - } catch (Throwable t) { - } - future = mux.futureSendError(e.getMessage()); - mux.setDown("protocol violation detected: " + - e.getMessage(), null); - } else { - try { - logger.log(Level.FINEST, - "mux reader thread dying: " + e.getMessage()); - } catch (Throwable t) { - } - } - } - if (future != null) { - try { - future.waitUntilDone(); - } catch (IOException ignore) { - } catch (InterruptedException interrupt) { - Thread.currentThread().interrupt(); - } - } - } catch (IOException e) { - try { - logger.log(Levels.HANDLED, - "mux reader thread dying, I/O error", e); - } catch (Throwable t) { - } - mux.setDown("I/O error reading from mux connection: " + - e.toString(), e); - } catch (Throwable t) { - try { - logger.log(Level.WARNING, - "mux reader thread dying, unexpected exception", t); - } catch (Throwable tt) { - } - mux.setDown("unexpected exception in mux reader thread: " + - t.toString(), t); - } finally { - try { - inChannel.close(); - } catch (IOException e) { - } - } - } - } - - /** - * The following two methods are modifications of their - * equivalents in java.nio.channels.Channels with the assumption - * that the supplied byte buffers are backed by arrays, so no - * additional copying is required. - */ - - public static ReadableByteChannel newChannel(final InputStream in) { - return new ReadableByteChannel() { - private volatile boolean open = true; - - // must be synchronized as per ReadableByteChannel contract - @Override - public synchronized int read(ByteBuffer dst) throws IOException { - assert dst.hasArray(); - byte[] array = dst.array(); - int arrayOffset = dst.arrayOffset(); - - int totalRead = 0; - int bytesRead = 0; - int bytesToRead; - while ((bytesToRead = dst.remaining()) > 0) { - if ((totalRead > 0) && !(in.available() > 0)) { - break; // block at most once - } - int pos = dst.position(); - bytesRead = in.read(array, arrayOffset + pos, bytesToRead); - if (bytesRead < 0) { - break; - } else { - dst.position(pos + bytesRead); - totalRead += bytesRead; - } - } - if ((bytesRead < 0) && (totalRead == 0)) { - return -1; - } - - return totalRead; - } - - @Override - public boolean isOpen() { - return open; - } - - // Blocking as per Channel contract - @Override - public synchronized void close() throws IOException { - in.close(); - open = false; - } - }; - } - - public static WritableByteChannel newChannel(final OutputStream out) { - return new WritableByteChannel() { - private volatile boolean open = true; - - // This method must block while writing as per WritableByteChannel contract. - @Override - public synchronized int write(ByteBuffer src) throws IOException { - assert src.hasArray(); - - int len = src.remaining(); - if (len > 0) { - int pos = src.position(); - out.write(src.array(), src.arrayOffset() + pos, len); - src.position(pos + len); - } - return len; - } - - @Override - public boolean isOpen() { - return open; - } - - // This method must block as per the Channel contract - @Override - public synchronized void close() throws IOException { - out.close(); - open = false; - } - }; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.river.jeri.internal.mux; + +import org.apache.river.logging.Levels; +import org.apache.river.thread.Executor; +import org.apache.river.thread.GetThreadPoolAction; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.security.AccessController; +import java.util.Deque; +import java.util.LinkedList; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * StreamConnectionIO implements the ConnectionIO abstraction for a + * connection accessible through standard (blocking) I/O streams, i.e. + * java.io.OutputStream and java.io.InputStream. + * + * @author Sun Microsystems, Inc. + **/ +final class StreamConnectionIO extends ConnectionIO { + + private static final int RECEIVE_BUFFER_SIZE = 2048; + + /** + * pool of threads for executing tasks in system thread group: + * used for I/O (reader and writer) threads + */ + private static final Executor systemThreadPool = + (Executor) AccessController.doPrivileged( + new GetThreadPoolAction(false)); + + /** mux logger */ + private static final Logger logger = + Logger.getLogger("net.jini.jeri.connection.mux"); + + /** I/O streams for underlying connection */ + private final OutputStream out; + private final InputStream in; + + /** channels wrapped around underlying I/O streams */ + private final WritableByteChannel outChannel; + private final ReadableByteChannel inChannel; + + /** + * queue of buffers of data to be sent over connection, interspersed + * with IOFuture objects that need to be notified in sequence + * + * Synchronised on super.mux.muxLock; + */ + private final Deque sendQueue; + + + /** + * Creates a new StreamConnectionIO for the connection represented by + * the supplied OutputStream and InputStream pair. + */ + StreamConnectionIO(Mux mux, OutputStream out, InputStream in) { + super(mux); + this.out = out; +// this.out = new BufferedOutputStream(out); + this.in = in; + + outChannel = newChannel(out); + inChannel = newChannel(in); + sendQueue = new LinkedList(); + } + + /** + * Starts processing connection data. This method starts + * asynchronous actions to read and write from the connection. + */ + @Override + void start() throws IOException { + try { + systemThreadPool.execute(new Writer(), "mux writer"); + systemThreadPool.execute(new Reader(), "mux reader"); + } catch (OutOfMemoryError e) { // assume out of threads + try { + logger.log(Level.WARNING, + "could not create thread for request dispatch", e); + } catch (Throwable t) { + } + throw new IOException("could not create I/O threads", e); + } + } + + @Override + void asyncSend(ByteBuffer buffer) { + synchronized (mux.muxLock) { + if (mux.muxDown) { + return; + } + sendQueue.addLast(buffer); + mux.muxLock.notifyAll(); + } + } + + @Override + void asyncSend(ByteBuffer first, ByteBuffer second) { + synchronized (mux.muxLock) { + if (mux.muxDown) { + return; + } + sendQueue.addLast(first); + sendQueue.addLast(second); + mux.muxLock.notifyAll(); + } + } + + @Override + IOFuture futureSend(ByteBuffer first, ByteBuffer second) { + synchronized (mux.muxLock) { + IOFuture future = new IOFuture(); + if (mux.muxDown) { + IOException ioe = new IOException(mux.muxDownMessage); + ioe.initCause(mux.muxDownCause); + future.done(ioe); + return future; + } + sendQueue.addLast(first); + sendQueue.addLast(second); + sendQueue.addLast(future); + mux.muxLock.notifyAll(); + return future; + } + /* + * REMIND: Can/should we implement any sort of + * priority inversion avoidance scheme here? + */ + } + + private class Writer implements Runnable { + Writer() { } + + @Override + public void run() { + Deque localQueue = null; + try { + while (true) { + synchronized (mux.muxLock) { + while (!mux.muxDown && sendQueue.isEmpty()) { + /* + * REMIND: Should we use a timeout here, to send + * occasional PING messages during periods of + * inactivity, to make sure connection is alive? + */ + mux.muxLock.wait(); + /* + * Let an interrupt during the wait just kill this + * thread, because an interrupt during an I/O write + * would leave it in an unrecoverable state anyway. + */ + } + if (mux.muxDown && sendQueue.isEmpty()) { + logger.log(Level.FINEST, + "mux writer thread dying, connection " + + "down and nothing more to send"); + break; + } + /* Clone an unshared copy and clear the queue while synchronized */ + localQueue = new LinkedList(sendQueue); + sendQueue.clear(); + } + + boolean needToFlush = false; + ByteBuffer last = null; + int lastIndex = Integer.MIN_VALUE; + for ( int i = 0; !localQueue.isEmpty(); i++) { + Object next = localQueue.getFirst(); + if (next instanceof ByteBuffer) { + ByteBuffer buffer = (ByteBuffer) next; + outChannel.write((buffer)); + last = buffer; + lastIndex = i; + needToFlush = true; + } else { + assert next instanceof IOFuture; + if (needToFlush) { + out.flush(); + needToFlush = false; + } + if (lastIndex == i - 1 && last != null){ + ((IOFuture) next).done(last.position()); + } else { + ((IOFuture) next).done(); + } + } + localQueue.removeFirst(); + } + if (needToFlush) { + out.flush(); + } + } + } catch (InterruptedException e) { + try { + logger.log(Level.WARNING, + "mux writer thread dying, interrupted", e); + } catch (Throwable t) { + } + mux.setDown("mux writer thread interrupted", e); + } catch (IOException e) { + try { + logger.log(Levels.HANDLED, + "mux writer thread dying, I/O error", e); + } catch (Throwable t) { + } + mux.setDown("I/O error writing to mux connection: " + + e.toString(), e); + } catch (Throwable t) { + try { + logger.log(Level.WARNING, + "mux writer thread dying, unexpected exception", t); + } catch (Throwable tt) { + } + mux.setDown("unexpected exception in mux writer thread: " + + t.toString(), t); + } finally { + synchronized (mux.muxLock) { + assert mux.muxDown; + if (localQueue != null) { + drainQueue(localQueue); + } + drainQueue(sendQueue); + } + try { + outChannel.close(); + } catch (IOException e) { + } + } + } + } + + private void drainQueue(Deque queue) { + while (!queue.isEmpty()) { + Object next = queue.removeFirst(); + if (next instanceof IOFuture) { + IOException ioe = new IOException(mux.muxDownMessage); + ioe.initCause(mux.muxDownCause); + ((IOFuture) next).done(ioe); + } + } + } + + private class Reader implements Runnable { + /** buffer for reading incoming data from connection */ + private final ByteBuffer inputBuffer = + ByteBuffer.allocate(RECEIVE_BUFFER_SIZE); // ready for reading + + Reader() { } + + public void run() { + try { + while (true) { + int n = inChannel.read(inputBuffer); + if (n == -1) { + throw new EOFException(); + } + assert n > 0; // channel is assumed to be blocking + mux.processIncomingData(inputBuffer); + assert inputBuffer.hasRemaining(); + } + } catch (ProtocolException e) { + IOFuture future = null; + synchronized (mux.muxLock) { + /* + * If mux connection is already down, then we probably got + * here because of the receipt of a normal protocol-ending + * message, like Shutdown or Error, or else something else + * went wrong anyway. Otherwise, a real protocol violation + * was detected, so respond with an Error message before + * taking down the whole mux connection. + */ + if (!mux.muxDown) { + try { + logger.log(Levels.HANDLED, + "mux reader thread dying, protocol error", e); + } catch (Throwable t) { + } + future = mux.futureSendError(e.getMessage()); + mux.setDown("protocol violation detected: " + + e.getMessage(), null); + } else { + try { + logger.log(Level.FINEST, + "mux reader thread dying: " + e.getMessage()); + } catch (Throwable t) { + } + } + } + if (future != null) { + try { + future.waitUntilDone(); + } catch (IOException ignore) { + } catch (InterruptedException interrupt) { + Thread.currentThread().interrupt(); + } + } + } catch (IOException e) { + try { + logger.log(Levels.HANDLED, + "mux reader thread dying, I/O error", e); + } catch (Throwable t) { + } + mux.setDown("I/O error reading from mux connection: " + + e.toString(), e); + } catch (Throwable t) { + try { + logger.log(Level.WARNING, + "mux reader thread dying, unexpected exception", t); + } catch (Throwable tt) { + } + mux.setDown("unexpected exception in mux reader thread: " + + t.toString(), t); + } finally { + try { + inChannel.close(); + } catch (IOException e) { + } + } + } + } + + /** + * The following two methods are modifications of their + * equivalents in java.nio.channels.Channels with the assumption + * that the supplied byte buffers are backed by arrays, so no + * additional copying is required. + */ + + public static ReadableByteChannel newChannel(final InputStream in) { + return new ReadableByteChannel() { + private boolean open = true; + + // must be synchronized as per ReadableByteChannel contract + @Override + public synchronized int read(ByteBuffer dst) throws IOException { + assert dst.hasArray(); + byte[] array = dst.array(); + int arrayOffset = dst.arrayOffset(); + + int totalRead = 0; + int bytesRead = 0; + int bytesToRead; + while ((bytesToRead = dst.remaining()) > 0) { + if ((totalRead > 0) && !(in.available() > 0)) { + break; // block at most once + } + int pos = dst.position(); + bytesRead = in.read(array, arrayOffset + pos, bytesToRead); + if (bytesRead < 0) { + break; + } else { + dst.position(pos + bytesRead); + totalRead += bytesRead; + } + } + if ((bytesRead < 0) && (totalRead == 0)) { + return -1; + } + + return totalRead; + } + + @Override + public synchronized boolean isOpen() { + return open; + } + + // Blocking as per Channel contract + @Override + public synchronized void close() throws IOException { + in.close(); + open = false; + } + }; + } + + public static WritableByteChannel newChannel(final OutputStream out) { + return new WritableByteChannel() { + private volatile boolean open = true; + + // This method must block while writing as per WritableByteChannel contract. + @Override + public synchronized int write(ByteBuffer src) throws IOException { + assert src.hasArray(); + + int len = src.remaining(); + if (len > 0) { + int pos = src.position(); + out.write(src.array(), src.arrayOffset() + pos, len); + src.position(pos + len); + } + return len; + } + + @Override + public boolean isOpen() { + return open; + } + + // This method must block as per the Channel contract + @Override + public synchronized void close() throws IOException { + out.close(); + open = false; + } + }; + } + +}
