tomaswolf commented on code in PR #217: URL: https://github.com/apache/mina-sshd/pull/217#discussion_r857178515
########## sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java: ########## @@ -90,52 +93,174 @@ public byte getCommandType() { return cmd; } - public void onWindowExpanded() throws IOException { - doWriteIfPossible(true); - } - + /** + * {@inheritDoc} + * + * This write operation is <em>asynchronous</em>: if there is not enough window space, it may keep the write pending + * or write only part of the buffer and keep the rest pending. Concurrent writes are not allowed and will throw a + * {@link WritePendingException}. Any subsequent write <em>must</em> occur only once the returned future is + * fulfilled; for instance triggered via a listener on the returned future. Try to avoid doing a subsequent write + * directly in a future listener, though; doing so may lead to deep chains of nested listener calls with deep stack + * traces, and may ultimately lead to a stack overflow. + * + * @throws WritePendingException if a concurrent write is attempted + */ @Override - public synchronized IoWriteFuture writeBuffer(Buffer buffer) throws IOException { + public IoWriteFuture writeBuffer(Buffer buffer) throws IOException { if (isClosing()) { - throw new EOFException("Closing: " + state); + throw new EOFException("Closing: " + writeState); } IoWriteFutureImpl future = new IoWriteFutureImpl(packetWriteId, buffer); - if (!pendingWrite.compareAndSet(null, future)) { - throw new WritePendingException("A write operation is already pending"); + synchronized (writeState) { + if (writeState.isClosing()) { // Double check. + throw new EOFException("Closing: " + writeState); + } + if (writeState.writeInProgress) { + throw new WritePendingException("A write operation is already pending"); + } + writeState.lastWrite = future; + writeState.pendingWrite = future; + writeState.writeInProgress = true; + writeState.waitingOnIo = false; } doWriteIfPossible(false); return future; } @Override protected void preClose() { - if (!(packetWriter instanceof Channel)) { - try { - packetWriter.close(); - } catch (IOException e) { - error("preClose({}) Failed ({}) to pre-close packet writer: {}", - this, e.getClass().getSimpleName(), e.getMessage(), e); + synchronized (writeState) { + writeState.openState = state.get(); + } + super.preClose(); + } + + @Override + protected void doCloseImmediately() { + try { + // Can't close this in preClose(); a graceful close waits for the currently pending write to finish and thus + // still needs the packet writer. + if (!(packetWriter instanceof Channel)) { + try { + packetWriter.close(); + } catch (IOException e) { + error("preClose({}) Failed ({}) to pre-close packet writer: {}", + this, e.getClass().getSimpleName(), e.getMessage(), e); + } } + super.doCloseImmediately(); + } finally { + shutdown(); } + } - super.preClose(); + private void shutdown() { + IoWriteFutureImpl current = null; + synchronized (writeState) { + writeState.openState = State.Closed; + current = writeState.pendingWrite; + writeState.pendingWrite = null; + writeState.waitingOnIo = false; + } + if (current != null) { + terminateFuture(current); + } + } + + private void terminateFuture(IoWriteFutureImpl future) { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@mina.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@mina.apache.org For additional commands, e-mail: dev-h...@mina.apache.org