In order to properly review changes, it would be great to know what the problem it is that you’re fixing - could you share?
Cheers, Greg Trasuk > On Nov 26, 2015, at 6:56 AM, peter_firmst...@apache.org wrote: > > Author: peter_firmstone > Date: Thu Nov 26 11:56:32 2015 > New Revision: 1716613 > > URL: http://svn.apache.org/viewvc?rev=1716613&view=rev > Log: > Commit my local Jeri multiplexer stability improvements to assist with jtreg > multiplexer nio tests: > > net.jini.jeri.tcp.outOfThreads.OutOfThreads.java > net.jini.jeri.tcp.outOfThreads.OutOfThreads2.java > > Modified: > > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java > > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java > > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java > > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java > > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java > > Modified: > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java > URL: > http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java?rev=1716613&r1=1716612&r2=1716613&view=diff > ============================================================================== > --- > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java > (original) > +++ > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java > Thu Nov 26 11:56:32 2015 > @@ -34,9 +34,7 @@ import java.nio.charset.CharsetEncoder; > import java.security.AccessController; > import java.util.BitSet; > import java.util.Deque; > -import java.util.HashMap; > import java.util.LinkedList; > -import java.util.Map; > import java.util.logging.Level; > import java.util.logging.Logger; > > @@ -107,6 +105,7 @@ abstract class Mux { > > public void run() { > for (int i = 0; i < sessions.length; i++) { > + if (sessions[i] != null) > sessions[i].setDown(message, cause); > } > } > @@ -135,7 +134,7 @@ abstract class Mux { > Throwable muxDownCause; > > final BitSet busySessions = new BitSet(); > - final Map<Integer,Session> sessions = new HashMap<Integer,Session>(128); > + final Session [] sessions = new Session[MAX_SESSION_ID + 1]; > > private int expectedPingCookie = -1; > > @@ -274,10 +273,15 @@ abstract class Mux { > assert Thread.holdsLock(muxLock); > assert !muxDown; > assert !busySessions.get(sessionID); > - assert sessions.get(Integer.valueOf(sessionID)) == null; > +// assert sessions.get(Byte.valueOf(sessionID)) == null; > + assert sessions[sessionID] == null; > > busySessions.set(sessionID); > - sessions.put(Integer.valueOf(sessionID), session); > +// Throwable t = new Throwable(); > +// System.out.println("Setting sessionID: "+ sessionID); > +// t.printStackTrace(System.out); > +// sessions.put(Byte.valueOf(sessionID), session); > + sessions[sessionID] = session; > } > > /** > @@ -285,14 +289,17 @@ abstract class Mux { > * This method is intended to be invoked by this class and > * subclasses only. > * > - * This method MAY be invoked while synchronized on muxLock. > + * This method MAY be invoked while synchronized on muxLock if failure > + * occurs during start up. > */ > final void setDown(final String message, final Throwable cause) { > + SessionShutdownTask sst = null; > synchronized (muxLock) { > if (muxDown) return; > muxDown = true; > muxDownMessage = message; > muxDownCause = cause; > + sst = new SessionShutdownTask(sessions.clone(), message, cause); > muxLock.notifyAll(); > } > > @@ -309,11 +316,8 @@ abstract class Mux { > */ > boolean needWorker = false; > synchronized (sessionShutdownQueue) { > - if (!sessions.isEmpty()) { > - sessionShutdownQueue.add(new SessionShutdownTask( > - (Session[]) sessions.values().toArray( > - new Session[sessions.values().size()]), > - message, cause)); > + if (sst != null) { > + sessionShutdownQueue.add(sst); > needWorker = true; > } else { > needWorker = !sessionShutdownQueue.isEmpty(); > @@ -360,7 +364,7 @@ abstract class Mux { > } > assert busySessions.get(sessionID); > busySessions.clear(sessionID); > - sessions.remove(Integer.valueOf(sessionID)); > + sessions[sessionID] = null; > } > } > > @@ -1178,8 +1182,7 @@ abstract class Mux { > getSession(sessionID).handleAcknowledgment(); > } > > - private void handleData(int sessionID, boolean open, boolean close, > - boolean eof, boolean ackRequired, ByteBuffer data) > + private void handleData(int sessionID, boolean open, boolean close, > boolean eof, boolean ackRequired, ByteBuffer data) > throws ProtocolException > { > if (logger.isLoggable(Level.FINEST)) { > @@ -1219,7 +1222,7 @@ abstract class Mux { > throw new ProtocolException( > "inactive sessionID: " + sessionID); > } > - return (Session) sessions.get(Integer.valueOf(sessionID)); > + return sessions[sessionID]; > } > } > > > Modified: > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java > URL: > http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java?rev=1716613&r1=1716612&r2=1716613&view=diff > ============================================================================== > --- > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java > (original) > +++ > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java > Thu Nov 26 11:56:32 2015 > @@ -1,129 +1,129 @@ > -/* > - * Licensed to the Apache Software Foundation (ASF) under one > - * or more contributor license agreements. See the NOTICE file > - * distributed with this work for additional information > - * regarding copyright ownership. The ASF licenses this file > - * to you under the Apache License, Version 2.0 (the > - * "License"); you may not use this file except in compliance > - * with the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, software > - * distributed under the License is distributed on an "AS IS" BASIS, > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > - * See the License for the specific language governing permissions and > - * limitations under the License. > - */ > - > -package org.apache.river.jeri.internal.mux; > - > -import org.apache.river.action.GetIntegerAction; > -import java.io.IOException; > -import java.io.InputStream; > -import java.io.OutputStream; > -import java.nio.channels.SocketChannel; > -import java.security.AccessController; > -import java.util.Collection; > -import net.jini.jeri.OutboundRequest; > - > -/** > - * A MuxClient controls the client side of multiplexed connection. > - * > - * @author Sun Microsystems, Inc. > - **/ > -public class MuxClient extends Mux { > - > - /** initial inbound ration as client, default is 32768 */ > - private static final int clientInitialInboundRation = > - ((Integer) AccessController.doPrivileged(new GetIntegerAction( > - "org.apache.river.jeri.connection.mux.client.initialInboundRation", > - 32768))).intValue(); > - > - /** > - * Initiates the client side of the multiplexed connection over > - * the given input/output stream pair. > - * > - * @param out the output stream of the underlying connection > - * > - * @param in the input stream of the underlying connection > - **/ > - public MuxClient(OutputStream out, InputStream in) throws IOException { > - super(out, in, Mux.CLIENT, clientInitialInboundRation, 1024); > - } > - > - public MuxClient(SocketChannel channel) throws IOException { > - super(channel, Mux.CLIENT, clientInitialInboundRation, 1024); > - } > - > - /** > - * Starts a new request over this connection, returning the > - * corresponding OutboundRequest object. > - * > - * @return the OutboundRequest for the newly created request > - **/ > - public OutboundRequest newRequest() throws IOException { > - synchronized (muxLock) { > - if (muxDown) { > - IOException ioe = new IOException(muxDownMessage); > - ioe.initCause(muxDownCause); > - throw ioe; > - } > - int sessionID = busySessions.nextClearBit(0); > - if (sessionID > Mux.MAX_SESSION_ID) { > - throw new IOException("no free sessions"); > - } > - > - Session session = new Session(this, sessionID, Session.CLIENT); > - addSession(sessionID, session); > - return session.getOutboundRequest(); > - } > - } > - > - /** > - * Returns the current number of requests in progress over this > - * connection. > - * > - * The value is guaranteed to not increase until the next > - * invocation of the newRequest method. > - * > - * @return the number of requests in progress over this connection > - * > - * @throws IOException if the multiplexed connection is no longer > - * active > - **/ > - public int requestsInProgress() throws IOException { > - synchronized (muxLock) { > - if (muxDown) { > - IOException ioe = new IOException(muxDownMessage); > - ioe.initCause(muxDownCause); > - throw ioe; > - } > - return busySessions.cardinality(); > - } > - } > - > - /** > - * Shuts down this multiplexed connection. Requests in progress > - * will throw IOException for future I/O operations. > - * > - * @param message reason for shutdown to be included in > - * IOExceptions thrown from future I/O operations > - **/ > - public void shutdown(String message) { > - synchronized (muxLock) { > - setDown(message, null); > - } > - } > - > - /** > - * Populates the context collection with information representing > - * this connection. > - * > - * This method should be overridden by subclasses to implement the > - * desired behavior of the populateContext method for > - * OutboundRequest instances generated for this connection. > - **/ > - protected void populateContext(Collection context) { > - } > -} > +/* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.river.jeri.internal.mux; > + > +import org.apache.river.action.GetIntegerAction; > +import java.io.IOException; > +import java.io.InputStream; > +import java.io.OutputStream; > +import java.nio.channels.SocketChannel; > +import java.security.AccessController; > +import java.util.Collection; > +import net.jini.jeri.OutboundRequest; > + > +/** > + * A MuxClient controls the client side of multiplexed connection. > + * > + * @author Sun Microsystems, Inc. > + **/ > +public class MuxClient extends Mux { > + > + /** initial inbound ration as client, default is 32768 */ > + private static final int clientInitialInboundRation = > + ((Integer) AccessController.doPrivileged(new GetIntegerAction( > + "org.apache.river.jeri.connection.mux.client.initialInboundRation", > + 32768))).intValue(); > + > + /** > + * Initiates the client side of the multiplexed connection over > + * the given input/output stream pair. > + * > + * @param out the output stream of the underlying connection > + * > + * @param in the input stream of the underlying connection > + **/ > + public MuxClient(OutputStream out, InputStream in) throws IOException { > + super(out, in, Mux.CLIENT, clientInitialInboundRation, 1024); > + } > + > + public MuxClient(SocketChannel channel) throws IOException { > + super(channel, Mux.CLIENT, clientInitialInboundRation, 1024); > + } > + > + /** > + * Starts a new request over this connection, returning the > + * corresponding OutboundRequest object. > + * > + * @return the OutboundRequest for the newly created request > + **/ > + public OutboundRequest newRequest() throws IOException { > + synchronized (muxLock) { > + if (muxDown) { > + IOException ioe = new IOException(muxDownMessage); > + ioe.initCause(muxDownCause); > + throw ioe; > + } > + byte sessionID = (byte) busySessions.nextClearBit(0); > + if (sessionID > Mux.MAX_SESSION_ID) { > + throw new IOException("no free sessions"); > + } > + > + Session session = new Session(this, sessionID, Session.CLIENT); > + addSession(sessionID, session); > + return session.getOutboundRequest(); > + } > + } > + > + /** > + * Returns the current number of requests in progress over this > + * connection. > + * > + * The value is guaranteed to not increase until the next > + * invocation of the newRequest method. > + * > + * @return the number of requests in progress over this connection > + * > + * @throws IOException if the multiplexed connection is no longer > + * active > + **/ > + public int requestsInProgress() throws IOException { > + synchronized (muxLock) { > + if (muxDown) { > + IOException ioe = new IOException(muxDownMessage); > + ioe.initCause(muxDownCause); > + throw ioe; > + } > + return busySessions.cardinality(); > + } > + } > + > + /** > + * Shuts down this multiplexed connection. Requests in progress > + * will throw IOException for future I/O operations. > + * > + * @param message reason for shutdown to be included in > + * IOExceptions thrown from future I/O operations > + **/ > + public void shutdown(String message) { > + synchronized (muxLock) { > + setDown(message, null); > + } > + } > + > + /** > + * Populates the context collection with information representing > + * this connection. > + * > + * This method should be overridden by subclasses to implement the > + * desired behavior of the populateContext method for > + * OutboundRequest instances generated for this connection. > + **/ > + protected void populateContext(Collection context) { > + } > +} > > Modified: > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java > URL: > http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java?rev=1716613&r1=1716612&r2=1716613&view=diff > ============================================================================== > --- > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java > (original) > +++ > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java > Thu Nov 26 11:56:32 2015 > @@ -1,308 +1,308 @@ > -/* > - * Licensed to the Apache Software Foundation (ASF) under one > - * or more contributor license agreements. See the NOTICE file > - * distributed with this work for additional information > - * regarding copyright ownership. The ASF licenses this file > - * to you under the Apache License, Version 2.0 (the > - * "License"); you may not use this file except in compliance > - * with the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, software > - * distributed under the License is distributed on an "AS IS" BASIS, > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > - * See the License for the specific language governing permissions and > - * limitations under the License. > - */ > - > -package org.apache.river.jeri.internal.mux; > - > -import java.io.IOException; > -import java.io.InputStream; > -import java.nio.ByteBuffer; > -import java.util.Deque; > -import java.util.LinkedList; > - > -/** > - * Output stream returned by OutboundRequests and InboundRequests for > - * a session of a multiplexed connection. > - */ > -class MuxInputStream extends InputStream { > - private final Object sessionLock; > - private final Session session; > - private final Mux mux; > - private final Deque<ByteBuffer> inBufQueue; > - private IOException sessionDown = null; > - private int inBufRemaining = 0; > - private int inBufPos = 0; > - private boolean inEOF = false; > - private boolean inClosed = false; > - private boolean sentAcknowledgment = false; > - > - MuxInputStream(Mux mux, Session session, Object sessionLock) { > - this.mux = mux; > - this.session = session; > - this.sessionLock = sessionLock; > - this.inBufQueue = new LinkedList<ByteBuffer>(); > - } > - > - void down(IOException e) { > - sessionDown = e; > - } > - > - void appendToBufQueue(ByteBuffer data) { > - inBufQueue.addLast(data); > - } > - > - @Override > - public int read() throws IOException { > - synchronized (sessionLock) { > - if (inClosed) { > - throw new IOException("stream closed"); > - } > - while (inBufRemaining == 0 && sessionDown == null && > session.getInState() <= Session.OPEN && !inClosed) { > - if (session.getInState() == Session.IDLE) { > - assert session.getOutState() == Session.IDLE; > - mux.asyncSendData(Mux.Data | Mux.Data_open, > session.sessionID, null); > - session.setOutState(Session.OPEN); > - session.setInState(Session.OPEN); > - } > - if (!session.inRationInfinite && session.getInRation() == 0) > { > - int inc = mux.initialInboundRation; > - mux.asyncSendIncrementRation(session.sessionID, inc); > - session.setInRation(session.getInRation() + inc); > - } > - try { > - sessionLock.wait(); // REMIND: timeout? > - } catch (InterruptedException e) { > - String message = "request I/O interrupted"; > - session.setDown(message, e); > - throw wrap(message, e); > - } > - } > - if (inClosed) { > - throw new IOException("stream closed"); > - } > - if (inBufRemaining == 0) { > - if (inEOF) { > - return -1; > - } else { > - if (session.getInState() == Session.TERMINATED) { > - throw new IOException("request aborted by remote > endpoint"); > - } > - assert sessionDown != null; > - throw sessionDown; > - } > - } > - assert inBufQueue.size() > 0; > - int result = -1; > - while (result == -1) { > - ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst(); > - if (inBufPos < buf.limit()) { > - result = (buf.get() & 0xFF); > - inBufPos++; > - inBufRemaining--; > - } > - if (inBufPos == buf.limit()) { > - inBufQueue.removeFirst(); > - inBufPos = 0; > - } > - } > - if (!session.inRationInfinite) { > - checkInboundRation(); > - } > - return result; > - } > - } > - > - private IOException wrap(String message, Exception e) { > - Throwable t; > - if (Session.traceSupression()) { > - t = e; > - } else { > - t = e.fillInStackTrace(); > - } > - return new IOException(message, t); > - } > - > - @Override > - public int read(byte[] b, int off, int len) throws IOException { > - if (b == null) { > - throw new NullPointerException(); > - } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + > len) > b.length) || ((off + len) < 0)) { > - throw new IndexOutOfBoundsException(); > - } > - synchronized (sessionLock) { > - if (inClosed) { > - throw new IOException("stream closed"); > - } else if (len == 0) { > - /* > - * REMIND: What if > - * - stream is at EOF? > - * - session was aborted? > - */ > - return 0; > - } > - while (inBufRemaining == 0 && sessionDown == null && > session.getInState() <= Session.OPEN && !inClosed) { > - if (session.getInState() == Session.IDLE) { > - assert session.getOutState() == Session.IDLE; > - mux.asyncSendData(Mux.Data | Mux.Data_open, > session.sessionID, null); > - session.setOutState(Session.OPEN); > - session.setInState(Session.OPEN); > - } > - if (!session.inRationInfinite && session.getInRation() == 0) > { > - int inc = mux.initialInboundRation; > - mux.asyncSendIncrementRation(session.sessionID, inc); > - session.setInRation(session.getInRation() + inc); > - } > - try { > - sessionLock.wait(); // REMIND: timeout? > - } catch (InterruptedException e) { > - String message = "request I/O interrupted"; > - session.setDown(message, e); > - throw wrap(message, e); > - } > - } > - if (inClosed) { > - throw new IOException("stream closed"); > - } > - if (inBufRemaining == 0) { > - if (inEOF) { > - return -1; > - } else { > - if (session.getInState() == Session.TERMINATED) { > - throw new IOException("request aborted by remote > endpoint"); > - } > - assert sessionDown != null; > - throw sessionDown; > - } > - } > - assert inBufQueue.size() > 0; > - int remaining = len; > - while (remaining > 0 && inBufRemaining > 0) { > - ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst(); > - if (inBufPos < buf.limit()) { > - int toCopy = Math.min(buf.limit() - inBufPos, remaining); > - buf.get(b, off, toCopy); > - inBufPos += toCopy; > - inBufRemaining -= toCopy; > - off += toCopy; > - remaining -= toCopy; > - } > - if (inBufPos == buf.limit()) { > - inBufQueue.removeFirst(); > - inBufPos = 0; > - } > - } > - if (!session.inRationInfinite) { > - checkInboundRation(); > - } > - return len - remaining; > - } > - } > - > - /** > - * Sends ration increment, if read drained buffers below > - * a certain mark. > - * > - * This method must NOT be invoked if the inbound ration in > - * infinite, and it must ONLY be invoked while synchronized on > - * this session's lock. > - * > - * REMIND: The implementation of this action will be a > - * significant area for performance tuning. > - */ > - private void checkInboundRation() { > - assert Thread.holdsLock(sessionLock); > - assert !session.inRationInfinite; > - if (session.getInState() >= Session.FINISHED) { > - return; > - } > - int mark = mux.initialInboundRation / 2; > - int used = inBufRemaining + session.getInRation(); > - if (used <= mark) { > - int inc = mux.initialInboundRation - used; > - mux.asyncSendIncrementRation(session.sessionID, inc); > - session.setInRation(session.getInRation() + inc); > - } > - } > - > - @Override > - public int available() throws IOException { > - synchronized (sessionLock) { > - if (inClosed) { > - throw new IOException("stream closed"); > - } > - /* > - * REMIND: What if > - * - stream is at EOF? > - * - session was aborted? > - */ > - return inBufRemaining; > - } > - } > - > - @Override > - public void close() { > - synchronized (sessionLock) { > - if (inClosed) { > - return; > - } > - inClosed = true; > - inBufQueue.clear(); // allow GC of unread data > - if (session.role == Session.CLIENT && !sentAcknowledgment && > session.isReceivedAckRequired() && session.getOutState() < > Session.TERMINATED) { > - mux.asyncSendAcknowledgment(session.sessionID); > - sentAcknowledgment = true; > - /* > - * If removing this session from the connection's > - * table was delayed in order to be able to send > - * an Acknowledgment message, then take care of > - * removing it now. > - */ > - if (session.isRemoveLater()) { > - session.setOutState(Session.TERMINATED); > - mux.removeSession(session.sessionID); > - session.setRemoveLater(false); > - } > - } > - sessionLock.notifyAll(); > - } > - } > - > - /** > - * @return the sentAcknowledgment > - */ > - boolean isSentAcknowledgment() { > - return sentAcknowledgment; > - } > - > - /** > - * @return the inBufRemaining > - */ > - int getBufRemaining() { > - return inBufRemaining; > - } > - > - /** > - * @return the inClosed > - */ > - boolean isClosed() { > - return inClosed; > - } > - > - /** > - * @param inBufRemaining the inBufRemaining to set > - */ > - void setBufRemaining(int inBufRemaining) { > - this.inBufRemaining = inBufRemaining; > - } > - > - /** > - * @param inEOF the inEOF to set > - */ > - void setEOF(boolean inEOF) { > - this.inEOF = inEOF; > - } > - > -} > +/* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.river.jeri.internal.mux; > + > +import java.io.IOException; > +import java.io.InputStream; > +import java.nio.ByteBuffer; > +import java.util.Deque; > +import java.util.LinkedList; > + > +/** > + * Output stream returned by OutboundRequests and InboundRequests for > + * a session of a multiplexed connection. > + */ > +class MuxInputStream extends InputStream { > + private final Object sessionLock; > + private final Session session; > + private final Mux mux; > + private final Deque<ByteBuffer> inBufQueue; > + private IOException sessionDown = null; > + private int inBufRemaining = 0; > + private int inBufPos = 0; > + private boolean inEOF = false; > + private boolean inClosed = false; > + private boolean sentAcknowledgment = false; > + > + MuxInputStream(Mux mux, Session session, Object sessionLock) { > + this.mux = mux; > + this.session = session; > + this.sessionLock = sessionLock; > + this.inBufQueue = new LinkedList<ByteBuffer>(); > + } > + > + void down(IOException e) { > + sessionDown = e; > + } > + > + void appendToBufQueue(ByteBuffer data) { > + inBufQueue.addLast(data); > + } > + > + @Override > + public int read() throws IOException { > + synchronized (sessionLock) { > + if (inClosed) { > + throw new IOException("stream closed"); > + } > + while (inBufRemaining == 0 && sessionDown == null && > session.getInState() <= Session.OPEN && !inClosed) { > + if (session.getInState() == Session.IDLE) { > + assert session.getOutState() == Session.IDLE; > + mux.asyncSendData(Mux.Data | Mux.Data_open, > session.sessionID, null); > + session.setOutState(Session.OPEN); > + session.setInState(Session.OPEN); > + } > + if (!session.inRationInfinite && session.getInRation() == 0) > { > + int inc = mux.initialInboundRation; > + mux.asyncSendIncrementRation(session.sessionID, inc); > + session.setInRation(session.getInRation() + inc); > + } > + try { > + sessionLock.wait(5000L); // REMIND: timeout? > + } catch (InterruptedException e) { > + String message = "request I/O interrupted"; > + session.setDown(message, e); > + throw wrap(message, e); > + } > + } > + if (inClosed) { > + throw new IOException("stream closed"); > + } > + if (inBufRemaining == 0) { > + if (inEOF) { > + return -1; > + } else { > + if (session.getInState() == Session.TERMINATED) { > + throw new IOException("request aborted by remote > endpoint"); > + } > + assert sessionDown != null; > + throw sessionDown; > + } > + } > + assert inBufQueue.size() > 0; > + int result = -1; > + while (result == -1) { > + ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst(); > + if (inBufPos < buf.limit()) { > + result = (buf.get() & 0xFF); > + inBufPos++; > + inBufRemaining--; > + } > + if (inBufPos == buf.limit()) { > + inBufQueue.removeFirst(); > + inBufPos = 0; > + } > + } > + if (!session.inRationInfinite) { > + checkInboundRation(); > + } > + return result; > + } > + } > + > + private IOException wrap(String message, Exception e) { > + Throwable t; > + if (Session.traceSupression()) { > + t = e; > + } else { > + t = e.fillInStackTrace(); > + } > + return new IOException(message, t); > + } > + > + @Override > + public int read(byte[] b, int off, int len) throws IOException { > + if (b == null) { > + throw new NullPointerException(); > + } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + > len) > b.length) || ((off + len) < 0)) { > + throw new IndexOutOfBoundsException(); > + } > + synchronized (sessionLock) { > + if (inClosed) { > + throw new IOException("stream closed"); > + } else if (len == 0) { > + /* > + * REMIND: What if > + * - stream is at EOF? > + * - session was aborted? > + */ > + return 0; > + } > + while (inBufRemaining == 0 && sessionDown == null && > session.getInState() <= Session.OPEN && !inClosed) { > + if (session.getInState() == Session.IDLE) { > + assert session.getOutState() == Session.IDLE; > + mux.asyncSendData(Mux.Data | Mux.Data_open, > session.sessionID, null); > + session.setOutState(Session.OPEN); > + session.setInState(Session.OPEN); > + } > + if (!session.inRationInfinite && session.getInRation() == 0) > { > + int inc = mux.initialInboundRation; > + mux.asyncSendIncrementRation(session.sessionID, inc); > + session.setInRation(session.getInRation() + inc); > + } > + try { > + sessionLock.wait(5000L); // REMIND: timeout? > + } catch (InterruptedException e) { > + String message = "request I/O interrupted"; > + session.setDown(message, e); > + throw wrap(message, e); > + } > + } > + if (inClosed) { > + throw new IOException("stream closed"); > + } > + if (inBufRemaining == 0) { > + if (inEOF) { > + return -1; > + } else { > + if (session.getInState() == Session.TERMINATED) { > + throw new IOException("request aborted by remote > endpoint"); > + } > + assert sessionDown != null; > + throw sessionDown; > + } > + } > + assert inBufQueue.size() > 0; > + int remaining = len; > + while (remaining > 0 && inBufRemaining > 0) { > + ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst(); > + if (inBufPos < buf.limit()) { > + int toCopy = Math.min(buf.limit() - inBufPos, remaining); > + buf.get(b, off, toCopy); > + inBufPos += toCopy; > + inBufRemaining -= toCopy; > + off += toCopy; > + remaining -= toCopy; > + } > + if (inBufPos == buf.limit()) { > + inBufQueue.removeFirst(); > + inBufPos = 0; > + } > + } > + if (!session.inRationInfinite) { > + checkInboundRation(); > + } > + return len - remaining; > + } > + } > + > + /** > + * Sends ration increment, if read drained buffers below > + * a certain mark. > + * > + * This method must NOT be invoked if the inbound ration in > + * infinite, and it must ONLY be invoked while synchronized on > + * this session's lock. > + * > + * REMIND: The implementation of this action will be a > + * significant area for performance tuning. > + */ > + private void checkInboundRation() { > + assert Thread.holdsLock(sessionLock); > + assert !session.inRationInfinite; > + if (session.getInState() >= Session.FINISHED) { > + return; > + } > + int mark = mux.initialInboundRation / 2; > + int used = inBufRemaining + session.getInRation(); > + if (used <= mark) { > + int inc = mux.initialInboundRation - used; > + mux.asyncSendIncrementRation(session.sessionID, inc); > + session.setInRation(session.getInRation() + inc); > + } > + } > + > + @Override > + public int available() throws IOException { > + synchronized (sessionLock) { > + if (inClosed) { > + throw new IOException("stream closed"); > + } > + /* > + * REMIND: What if > + * - stream is at EOF? > + * - session was aborted? > + */ > + return inBufRemaining; > + } > + } > + > + @Override > + public void close() { > + synchronized (sessionLock) { > + if (inClosed) { > + return; > + } > + inClosed = true; > + inBufQueue.clear(); // allow GC of unread data > + if (session.role == Session.CLIENT && !sentAcknowledgment && > session.isReceivedAckRequired() && session.getOutState() < > Session.TERMINATED) { > + mux.asyncSendAcknowledgment(session.sessionID); > + sentAcknowledgment = true; > + /* > + * If removing this session from the connection's > + * table was delayed in order to be able to send > + * an Acknowledgment message, then take care of > + * removing it now. > + */ > + if (session.isRemoveLater()) { > + session.setOutState(Session.TERMINATED); > + mux.removeSession(session.sessionID); > + session.setRemoveLater(false); > + } > + } > + sessionLock.notifyAll(); > + } > + } > + > + /** > + * @return the sentAcknowledgment > + */ > + boolean isSentAcknowledgment() { > + return sentAcknowledgment; > + } > + > + /** > + * @return the inBufRemaining > + */ > + int getBufRemaining() { > + return inBufRemaining; > + } > + > + /** > + * @return the inClosed > + */ > + boolean isClosed() { > + return inClosed; > + } > + > + /** > + * @param inBufRemaining the inBufRemaining to set > + */ > + void setBufRemaining(int inBufRemaining) { > + this.inBufRemaining = inBufRemaining; > + } > + > + /** > + * @param inEOF the inEOF to set > + */ > + void setEOF(boolean inEOF) { > + this.inEOF = inEOF; > + } > + > +} > > Modified: > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java > URL: > http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java?rev=1716613&r1=1716612&r2=1716613&view=diff > ============================================================================== > --- > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java > (original) > +++ > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java > Thu Nov 26 11:56:32 2015 > @@ -210,7 +210,7 @@ public class MuxServer extends Mux { > dispatchNewRequest(sessionID); > return; > } else { > - session = (Session) sessions.get(Integer.valueOf(sessionID)); > + session = sessions[sessionID]; > assert session != null; > } > } > > Modified: > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java > URL: > http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java?rev=1716613&r1=1716612&r2=1716613&view=diff > ============================================================================== > --- > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java > (original) > +++ > river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java > Thu Nov 26 11:56:32 2015 > @@ -1,434 +1,434 @@ > -/* > - * Licensed to the Apache Software Foundation (ASF) under one > - * or more contributor license agreements. See the NOTICE file > - * distributed with this work for additional information > - * regarding copyright ownership. The ASF licenses this file > - * to you under the Apache License, Version 2.0 (the > - * "License"); you may not use this file except in compliance > - * with the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, software > - * distributed under the License is distributed on an "AS IS" BASIS, > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > - * See the License for the specific language governing permissions and > - * limitations under the License. > - */ > - > -package org.apache.river.jeri.internal.mux; > - > -import org.apache.river.logging.Levels; > -import org.apache.river.thread.Executor; > -import org.apache.river.thread.GetThreadPoolAction; > -import java.io.EOFException; > -import java.io.IOException; > -import java.io.InputStream; > -import java.io.OutputStream; > -import java.nio.ByteBuffer; > -import java.nio.channels.ReadableByteChannel; > -import java.nio.channels.WritableByteChannel; > -import java.security.AccessController; > -import java.util.Deque; > -import java.util.LinkedList; > -import java.util.logging.Level; > -import java.util.logging.Logger; > - > -/** > - * StreamConnectionIO implements the ConnectionIO abstraction for a > - * connection accessible through standard (blocking) I/O streams, i.e. > - * java.io.OutputStream and java.io.InputStream. > - * > - * @author Sun Microsystems, Inc. > - **/ > -final class StreamConnectionIO extends ConnectionIO { > - > - private static final int RECEIVE_BUFFER_SIZE = 2048; > - > - /** > - * pool of threads for executing tasks in system thread group: > - * used for I/O (reader and writer) threads > - */ > - private static final Executor systemThreadPool = > - (Executor) AccessController.doPrivileged( > - new GetThreadPoolAction(false)); > - > - /** mux logger */ > - private static final Logger logger = > - Logger.getLogger("net.jini.jeri.connection.mux"); > - > - /** I/O streams for underlying connection */ > - private final OutputStream out; > - private final InputStream in; > - > - /** channels wrapped around underlying I/O streams */ > - private final WritableByteChannel outChannel; > - private final ReadableByteChannel inChannel; > - > - /** > - * queue of buffers of data to be sent over connection, interspersed > - * with IOFuture objects that need to be notified in sequence > - * > - * Synchronised on super.mux.muxLock; > - */ > - private final Deque sendQueue; > - > - > - /** > - * Creates a new StreamConnectionIO for the connection represented by > - * the supplied OutputStream and InputStream pair. > - */ > - StreamConnectionIO(Mux mux, OutputStream out, InputStream in) { > - super(mux); > - this.out = out; > -// this.out = new BufferedOutputStream(out); > - this.in = in; > - > - outChannel = newChannel(out); > - inChannel = newChannel(in); > - sendQueue = new LinkedList(); > - } > - > - /** > - * Starts processing connection data. This method starts > - * asynchronous actions to read and write from the connection. > - */ > - @Override > - void start() throws IOException { > - try { > - systemThreadPool.execute(new Writer(), "mux writer"); > - systemThreadPool.execute(new Reader(), "mux reader"); > - } catch (OutOfMemoryError e) { // assume out of threads > - try { > - logger.log(Level.WARNING, > - "could not create thread for request dispatch", e); > - } catch (Throwable t) { > - } > - throw new IOException("could not create I/O threads", e); > - } > - } > - > - @Override > - void asyncSend(ByteBuffer buffer) { > - synchronized (mux.muxLock) { > - if (mux.muxDown) { > - return; > - } > - sendQueue.addLast(buffer); > - mux.muxLock.notifyAll(); > - } > - } > - > - @Override > - void asyncSend(ByteBuffer first, ByteBuffer second) { > - synchronized (mux.muxLock) { > - if (mux.muxDown) { > - return; > - } > - sendQueue.addLast(first); > - sendQueue.addLast(second); > - mux.muxLock.notifyAll(); > - } > - } > - > - @Override > - IOFuture futureSend(ByteBuffer first, ByteBuffer second) { > - synchronized (mux.muxLock) { > - IOFuture future = new IOFuture(); > - if (mux.muxDown) { > - IOException ioe = new IOException(mux.muxDownMessage); > - ioe.initCause(mux.muxDownCause); > - future.done(ioe); > - return future; > - } > - sendQueue.addLast(first); > - sendQueue.addLast(second); > - sendQueue.addLast(future); > - mux.muxLock.notifyAll(); > - return future; > - } > - /* > - * REMIND: Can/should we implement any sort of > - * priority inversion avoidance scheme here? > - */ > - } > - > - private class Writer implements Runnable { > - Writer() { } > - > - @Override > - public void run() { > - Deque localQueue = null; > - try { > - while (true) { > - synchronized (mux.muxLock) { > - while (!mux.muxDown && sendQueue.isEmpty()) { > - /* > - * REMIND: Should we use a timeout here, to send > - * occasional PING messages during periods of > - * inactivity, to make sure connection is alive? > - */ > - mux.muxLock.wait(); > - /* > - * Let an interrupt during the wait just kill this > - * thread, because an interrupt during an I/O write > - * would leave it in an unrecoverable state anyway. > - */ > - } > - if (mux.muxDown && sendQueue.isEmpty()) { > - logger.log(Level.FINEST, > - "mux writer thread dying, connection " + > - "down and nothing more to send"); > - break; > - } > - /* Clone an unshared copy and clear the queue while > synchronized */ > - localQueue = new LinkedList(sendQueue); > - sendQueue.clear(); > - } > - > - boolean needToFlush = false; > - ByteBuffer last = null; > - int lastIndex = Integer.MIN_VALUE; > - for ( int i = 0; !localQueue.isEmpty(); i++) { > - Object next = localQueue.getFirst(); > - if (next instanceof ByteBuffer) { > - ByteBuffer buffer = (ByteBuffer) next; > - outChannel.write((buffer)); > - last = buffer; > - lastIndex = i; > - needToFlush = true; > - } else { > - assert next instanceof IOFuture; > - if (needToFlush) { > - out.flush(); > - needToFlush = false; > - } > - if (lastIndex == i - 1 && last != null){ > - ((IOFuture) next).done(last.position()); > - } else { > - ((IOFuture) next).done(); > - } > - } > - localQueue.removeFirst(); > - } > - if (needToFlush) { > - out.flush(); > - } > - } > - } catch (InterruptedException e) { > - try { > - logger.log(Level.WARNING, > - "mux writer thread dying, interrupted", e); > - } catch (Throwable t) { > - } > - mux.setDown("mux writer thread interrupted", e); > - } catch (IOException e) { > - try { > - logger.log(Levels.HANDLED, > - "mux writer thread dying, I/O error", e); > - } catch (Throwable t) { > - } > - mux.setDown("I/O error writing to mux connection: " + > - e.toString(), e); > - } catch (Throwable t) { > - try { > - logger.log(Level.WARNING, > - "mux writer thread dying, unexpected exception", t); > - } catch (Throwable tt) { > - } > - mux.setDown("unexpected exception in mux writer thread: " + > - t.toString(), t); > - } finally { > - synchronized (mux.muxLock) { > - assert mux.muxDown; > - if (localQueue != null) { > - drainQueue(localQueue); > - } > - drainQueue(sendQueue); > - } > - try { > - outChannel.close(); > - } catch (IOException e) { > - } > - } > - } > - } > - > - private void drainQueue(Deque queue) { > - while (!queue.isEmpty()) { > - Object next = queue.removeFirst(); > - if (next instanceof IOFuture) { > - IOException ioe = new IOException(mux.muxDownMessage); > - ioe.initCause(mux.muxDownCause); > - ((IOFuture) next).done(ioe); > - } > - } > - } > - > - private class Reader implements Runnable { > - /** buffer for reading incoming data from connection */ > - private final ByteBuffer inputBuffer = > - ByteBuffer.allocate(RECEIVE_BUFFER_SIZE); // ready for > reading > - > - Reader() { } > - > - public void run() { > - try { > - while (true) { > - int n = inChannel.read(inputBuffer); > - if (n == -1) { > - throw new EOFException(); > - } > - assert n > 0; // channel is assumed to be blocking > - mux.processIncomingData(inputBuffer); > - assert inputBuffer.hasRemaining(); > - } > - } catch (ProtocolException e) { > - IOFuture future = null; > - synchronized (mux.muxLock) { > - /* > - * If mux connection is already down, then we probably got > - * here because of the receipt of a normal protocol-ending > - * message, like Shutdown or Error, or else something else > - * went wrong anyway. Otherwise, a real protocol violation > - * was detected, so respond with an Error message before > - * taking down the whole mux connection. > - */ > - if (!mux.muxDown) { > - try { > - logger.log(Levels.HANDLED, > - "mux reader thread dying, protocol error", e); > - } catch (Throwable t) { > - } > - future = mux.futureSendError(e.getMessage()); > - mux.setDown("protocol violation detected: " + > - e.getMessage(), null); > - } else { > - try { > - logger.log(Level.FINEST, > - "mux reader thread dying: " + e.getMessage()); > - } catch (Throwable t) { > - } > - } > - } > - if (future != null) { > - try { > - future.waitUntilDone(); > - } catch (IOException ignore) { > - } catch (InterruptedException interrupt) { > - Thread.currentThread().interrupt(); > - } > - } > - } catch (IOException e) { > - try { > - logger.log(Levels.HANDLED, > - "mux reader thread dying, I/O error", e); > - } catch (Throwable t) { > - } > - mux.setDown("I/O error reading from mux connection: " + > - e.toString(), e); > - } catch (Throwable t) { > - try { > - logger.log(Level.WARNING, > - "mux reader thread dying, unexpected exception", t); > - } catch (Throwable tt) { > - } > - mux.setDown("unexpected exception in mux reader thread: " + > - t.toString(), t); > - } finally { > - try { > - inChannel.close(); > - } catch (IOException e) { > - } > - } > - } > - } > - > - /** > - * The following two methods are modifications of their > - * equivalents in java.nio.channels.Channels with the assumption > - * that the supplied byte buffers are backed by arrays, so no > - * additional copying is required. > - */ > - > - public static ReadableByteChannel newChannel(final InputStream in) { > - return new ReadableByteChannel() { > - private volatile boolean open = true; > - > - // must be synchronized as per ReadableByteChannel contract > - @Override > - public synchronized int read(ByteBuffer dst) throws IOException { > - assert dst.hasArray(); > - byte[] array = dst.array(); > - int arrayOffset = dst.arrayOffset(); > - > - int totalRead = 0; > - int bytesRead = 0; > - int bytesToRead; > - while ((bytesToRead = dst.remaining()) > 0) { > - if ((totalRead > 0) && !(in.available() > 0)) { > - break; // block at most once > - } > - int pos = dst.position(); > - bytesRead = in.read(array, arrayOffset + pos, bytesToRead); > - if (bytesRead < 0) { > - break; > - } else { > - dst.position(pos + bytesRead); > - totalRead += bytesRead; > - } > - } > - if ((bytesRead < 0) && (totalRead == 0)) { > - return -1; > - } > - > - return totalRead; > - } > - > - @Override > - public boolean isOpen() { > - return open; > - } > - > - // Blocking as per Channel contract > - @Override > - public synchronized void close() throws IOException { > - in.close(); > - open = false; > - } > - }; > - } > - > - public static WritableByteChannel newChannel(final OutputStream out) { > - return new WritableByteChannel() { > - private volatile boolean open = true; > - > - // This method must block while writing as per > WritableByteChannel contract. > - @Override > - public synchronized int write(ByteBuffer src) throws IOException { > - assert src.hasArray(); > - > - int len = src.remaining(); > - if (len > 0) { > - int pos = src.position(); > - out.write(src.array(), src.arrayOffset() + pos, len); > - src.position(pos + len); > - } > - return len; > - } > - > - @Override > - public boolean isOpen() { > - return open; > - } > - > - // This method must block as per the Channel contract > - @Override > - public synchronized void close() throws IOException { > - out.close(); > - open = false; > - } > - }; > - } > - > -} > +/* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.river.jeri.internal.mux; > + > +import org.apache.river.logging.Levels; > +import org.apache.river.thread.Executor; > +import org.apache.river.thread.GetThreadPoolAction; > +import java.io.EOFException; > +import java.io.IOException; > +import java.io.InputStream; > +import java.io.OutputStream; > +import java.nio.ByteBuffer; > +import java.nio.channels.ReadableByteChannel; > +import java.nio.channels.WritableByteChannel; > +import java.security.AccessController; > +import java.util.Deque; > +import java.util.LinkedList; > +import java.util.logging.Level; > +import java.util.logging.Logger; > + > +/** > + * StreamConnectionIO implements the ConnectionIO abstraction for a > + * connection accessible through standard (blocking) I/O streams, i.e. > + * java.io.OutputStream and java.io.InputStream. > + * > + * @author Sun Microsystems, Inc. > + **/ > +final class StreamConnectionIO extends ConnectionIO { > + > + private static final int RECEIVE_BUFFER_SIZE = 2048; > + > + /** > + * pool of threads for executing tasks in system thread group: > + * used for I/O (reader and writer) threads > + */ > + private static final Executor systemThreadPool = > + (Executor) AccessController.doPrivileged( > + new GetThreadPoolAction(false)); > + > + /** mux logger */ > + private static final Logger logger = > + Logger.getLogger("net.jini.jeri.connection.mux"); > + > + /** I/O streams for underlying connection */ > + private final OutputStream out; > + private final InputStream in; > + > + /** channels wrapped around underlying I/O streams */ > + private final WritableByteChannel outChannel; > + private final ReadableByteChannel inChannel; > + > + /** > + * queue of buffers of data to be sent over connection, interspersed > + * with IOFuture objects that need to be notified in sequence > + * > + * Synchronised on super.mux.muxLock; > + */ > + private final Deque sendQueue; > + > + > + /** > + * Creates a new StreamConnectionIO for the connection represented by > + * the supplied OutputStream and InputStream pair. > + */ > + StreamConnectionIO(Mux mux, OutputStream out, InputStream in) { > + super(mux); > + this.out = out; > +// this.out = new BufferedOutputStream(out); > + this.in = in; > + > + outChannel = newChannel(out); > + inChannel = newChannel(in); > + sendQueue = new LinkedList(); > + } > + > + /** > + * Starts processing connection data. This method starts > + * asynchronous actions to read and write from the connection. > + */ > + @Override > + void start() throws IOException { > + try { > + systemThreadPool.execute(new Writer(), "mux writer"); > + systemThreadPool.execute(new Reader(), "mux reader"); > + } catch (OutOfMemoryError e) { // assume out of threads > + try { > + logger.log(Level.WARNING, > + "could not create thread for request dispatch", e); > + } catch (Throwable t) { > + } > + throw new IOException("could not create I/O threads", e); > + } > + } > + > + @Override > + void asyncSend(ByteBuffer buffer) { > + synchronized (mux.muxLock) { > + if (mux.muxDown) { > + return; > + } > + sendQueue.addLast(buffer); > + mux.muxLock.notifyAll(); > + } > + } > + > + @Override > + void asyncSend(ByteBuffer first, ByteBuffer second) { > + synchronized (mux.muxLock) { > + if (mux.muxDown) { > + return; > + } > + sendQueue.addLast(first); > + sendQueue.addLast(second); > + mux.muxLock.notifyAll(); > + } > + } > + > + @Override > + IOFuture futureSend(ByteBuffer first, ByteBuffer second) { > + synchronized (mux.muxLock) { > + IOFuture future = new IOFuture(); > + if (mux.muxDown) { > + IOException ioe = new IOException(mux.muxDownMessage); > + ioe.initCause(mux.muxDownCause); > + future.done(ioe); > + return future; > + } > + sendQueue.addLast(first); > + sendQueue.addLast(second); > + sendQueue.addLast(future); > + mux.muxLock.notifyAll(); > + return future; > + } > + /* > + * REMIND: Can/should we implement any sort of > + * priority inversion avoidance scheme here? > + */ > + } > + > + private class Writer implements Runnable { > + Writer() { } > + > + @Override > + public void run() { > + Deque localQueue = null; > + try { > + while (true) { > + synchronized (mux.muxLock) { > + while (!mux.muxDown && sendQueue.isEmpty()) { > + /* > + * REMIND: Should we use a timeout here, to send > + * occasional PING messages during periods of > + * inactivity, to make sure connection is alive? > + */ > + mux.muxLock.wait(); > + /* > + * Let an interrupt during the wait just kill this > + * thread, because an interrupt during an I/O write > + * would leave it in an unrecoverable state anyway. > + */ > + } > + if (mux.muxDown && sendQueue.isEmpty()) { > + logger.log(Level.FINEST, > + "mux writer thread dying, connection " + > + "down and nothing more to send"); > + break; > + } > + /* Clone an unshared copy and clear the queue while > synchronized */ > + localQueue = new LinkedList(sendQueue); > + sendQueue.clear(); > + } > + > + boolean needToFlush = false; > + ByteBuffer last = null; > + int lastIndex = Integer.MIN_VALUE; > + for ( int i = 0; !localQueue.isEmpty(); i++) { > + Object next = localQueue.getFirst(); > + if (next instanceof ByteBuffer) { > + ByteBuffer buffer = (ByteBuffer) next; > + outChannel.write((buffer)); > + last = buffer; > + lastIndex = i; > + needToFlush = true; > + } else { > + assert next instanceof IOFuture; > + if (needToFlush) { > + out.flush(); > + needToFlush = false; > + } > + if (lastIndex == i - 1 && last != null){ > + ((IOFuture) next).done(last.position()); > + } else { > + ((IOFuture) next).done(); > + } > + } > + localQueue.removeFirst(); > + } > + if (needToFlush) { > + out.flush(); > + } > + } > + } catch (InterruptedException e) { > + try { > + logger.log(Level.WARNING, > + "mux writer thread dying, interrupted", e); > + } catch (Throwable t) { > + } > + mux.setDown("mux writer thread interrupted", e); > + } catch (IOException e) { > + try { > + logger.log(Levels.HANDLED, > + "mux writer thread dying, I/O error", e); > + } catch (Throwable t) { > + } > + mux.setDown("I/O error writing to mux connection: " + > + e.toString(), e); > + } catch (Throwable t) { > + try { > + logger.log(Level.WARNING, > + "mux writer thread dying, unexpected exception", t); > + } catch (Throwable tt) { > + } > + mux.setDown("unexpected exception in mux writer thread: " + > + t.toString(), t); > + } finally { > + synchronized (mux.muxLock) { > + assert mux.muxDown; > + if (localQueue != null) { > + drainQueue(localQueue); > + } > + drainQueue(sendQueue); > + } > + try { > + outChannel.close(); > + } catch (IOException e) { > + } > + } > + } > + } > + > + private void drainQueue(Deque queue) { > + while (!queue.isEmpty()) { > + Object next = queue.removeFirst(); > + if (next instanceof IOFuture) { > + IOException ioe = new IOException(mux.muxDownMessage); > + ioe.initCause(mux.muxDownCause); > + ((IOFuture) next).done(ioe); > + } > + } > + } > + > + private class Reader implements Runnable { > + /** buffer for reading incoming data from connection */ > + private final ByteBuffer inputBuffer = > + ByteBuffer.allocate(RECEIVE_BUFFER_SIZE); // ready for > reading > + > + Reader() { } > + > + public void run() { > + try { > + while (true) { > + int n = inChannel.read(inputBuffer); > + if (n == -1) { > + throw new EOFException(); > + } > + assert n > 0; // channel is assumed to be blocking > + mux.processIncomingData(inputBuffer); > + assert inputBuffer.hasRemaining(); > + } > + } catch (ProtocolException e) { > + IOFuture future = null; > + synchronized (mux.muxLock) { > + /* > + * If mux connection is already down, then we probably got > + * here because of the receipt of a normal protocol-ending > + * message, like Shutdown or Error, or else something else > + * went wrong anyway. Otherwise, a real protocol violation > + * was detected, so respond with an Error message before > + * taking down the whole mux connection. > + */ > + if (!mux.muxDown) { > + try { > + logger.log(Levels.HANDLED, > + "mux reader thread dying, protocol error", e); > + } catch (Throwable t) { > + } > + future = mux.futureSendError(e.getMessage()); > + mux.setDown("protocol violation detected: " + > + e.getMessage(), null); > + } else { > + try { > + logger.log(Level.FINEST, > + "mux reader thread dying: " + e.getMessage()); > + } catch (Throwable t) { > + } > + } > + } > + if (future != null) { > + try { > + future.waitUntilDone(); > + } catch (IOException ignore) { > + } catch (InterruptedException interrupt) { > + Thread.currentThread().interrupt(); > + } > + } > + } catch (IOException e) { > + try { > + logger.log(Levels.HANDLED, > + "mux reader thread dying, I/O error", e); > + } catch (Throwable t) { > + } > + mux.setDown("I/O error reading from mux connection: " + > + e.toString(), e); > + } catch (Throwable t) { > + try { > + logger.log(Level.WARNING, > + "mux reader thread dying, unexpected exception", t); > + } catch (Throwable tt) { > + } > + mux.setDown("unexpected exception in mux reader thread: " + > + t.toString(), t); > + } finally { > + try { > + inChannel.close(); > + } catch (IOException e) { > + } > + } > + } > + } > + > + /** > + * The following two methods are modifications of their > + * equivalents in java.nio.channels.Channels with the assumption > + * that the supplied byte buffers are backed by arrays, so no > + * additional copying is required. > + */ > + > + public static ReadableByteChannel newChannel(final InputStream in) { > + return new ReadableByteChannel() { > + private boolean open = true; > + > + // must be synchronized as per ReadableByteChannel contract > + @Override > + public synchronized int read(ByteBuffer dst) throws IOException { > + assert dst.hasArray(); > + byte[] array = dst.array(); > + int arrayOffset = dst.arrayOffset(); > + > + int totalRead = 0; > + int bytesRead = 0; > + int bytesToRead; > + while ((bytesToRead = dst.remaining()) > 0) { > + if ((totalRead > 0) && !(in.available() > 0)) { > + break; // block at most once > + } > + int pos = dst.position(); > + bytesRead = in.read(array, arrayOffset + pos, bytesToRead); > + if (bytesRead < 0) { > + break; > + } else { > + dst.position(pos + bytesRead); > + totalRead += bytesRead; > + } > + } > + if ((bytesRead < 0) && (totalRead == 0)) { > + return -1; > + } > + > + return totalRead; > + } > + > + @Override > + public synchronized boolean isOpen() { > + return open; > + } > + > + // Blocking as per Channel contract > + @Override > + public synchronized void close() throws IOException { > + in.close(); > + open = false; > + } > + }; > + } > + > + public static WritableByteChannel newChannel(final OutputStream out) { > + return new WritableByteChannel() { > + private volatile boolean open = true; > + > + // This method must block while writing as per > WritableByteChannel contract. > + @Override > + public synchronized int write(ByteBuffer src) throws IOException { > + assert src.hasArray(); > + > + int len = src.remaining(); > + if (len > 0) { > + int pos = src.position(); > + out.write(src.array(), src.arrayOffset() + pos, len); > + src.position(pos + len); > + } > + return len; > + } > + > + @Override > + public boolean isOpen() { > + return open; > + } > + > + // This method must block as per the Channel contract > + @Override > + public synchronized void close() throws IOException { > + out.close(); > + open = false; > + } > + }; > + } > + > +} > >