[ https://issues.apache.org/jira/browse/SSHD-966?focusedWorklogId=761510&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-761510 ]
ASF GitHub Bot logged work on SSHD-966: --------------------------------------- Author: ASF GitHub Bot Created on: 24/Apr/22 20:59 Start Date: 24/Apr/22 20:59 Worklog Time Spent: 10m Work Description: tomaswolf commented on code in PR #217: URL: https://github.com/apache/mina-sshd/pull/217#discussion_r857178515 ########## sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java: ########## @@ -90,52 +93,174 @@ public byte getCommandType() { return cmd; } - public void onWindowExpanded() throws IOException { - doWriteIfPossible(true); - } - + /** + * {@inheritDoc} + * + * This write operation is <em>asynchronous</em>: if there is not enough window space, it may keep the write pending + * or write only part of the buffer and keep the rest pending. Concurrent writes are not allowed and will throw a + * {@link WritePendingException}. Any subsequent write <em>must</em> occur only once the returned future is + * fulfilled; for instance triggered via a listener on the returned future. Try to avoid doing a subsequent write + * directly in a future listener, though; doing so may lead to deep chains of nested listener calls with deep stack + * traces, and may ultimately lead to a stack overflow. + * + * @throws WritePendingException if a concurrent write is attempted + */ @Override - public synchronized IoWriteFuture writeBuffer(Buffer buffer) throws IOException { + public IoWriteFuture writeBuffer(Buffer buffer) throws IOException { if (isClosing()) { - throw new EOFException("Closing: " + state); + throw new EOFException("Closing: " + writeState); } IoWriteFutureImpl future = new IoWriteFutureImpl(packetWriteId, buffer); - if (!pendingWrite.compareAndSet(null, future)) { - throw new WritePendingException("A write operation is already pending"); + synchronized (writeState) { + if (writeState.isClosing()) { // Double check. + throw new EOFException("Closing: " + writeState); + } + if (writeState.writeInProgress) { + throw new WritePendingException("A write operation is already pending"); + } + writeState.lastWrite = future; + writeState.pendingWrite = future; + writeState.writeInProgress = true; + writeState.waitingOnIo = false; } doWriteIfPossible(false); return future; } @Override protected void preClose() { - if (!(packetWriter instanceof Channel)) { - try { - packetWriter.close(); - } catch (IOException e) { - error("preClose({}) Failed ({}) to pre-close packet writer: {}", - this, e.getClass().getSimpleName(), e.getMessage(), e); + synchronized (writeState) { + writeState.openState = state.get(); + } + super.preClose(); + } + + @Override + protected void doCloseImmediately() { + try { + // Can't close this in preClose(); a graceful close waits for the currently pending write to finish and thus + // still needs the packet writer. + if (!(packetWriter instanceof Channel)) { + try { + packetWriter.close(); + } catch (IOException e) { + error("preClose({}) Failed ({}) to pre-close packet writer: {}", + this, e.getClass().getSimpleName(), e.getMessage(), e); + } } + super.doCloseImmediately(); + } finally { + shutdown(); } + } - super.preClose(); + private void shutdown() { + IoWriteFutureImpl current = null; + synchronized (writeState) { + writeState.openState = State.Closed; + current = writeState.pendingWrite; + writeState.pendingWrite = null; + writeState.waitingOnIo = false; + } + if (current != null) { + terminateFuture(current); + } + } + + private void terminateFuture(IoWriteFutureImpl future) { Review Comment: Done. Issue Time Tracking ------------------- Worklog Id: (was: 761510) Time Spent: 4h 20m (was: 4h 10m) > Deadlock on disconnection at the end of key-exchange > ---------------------------------------------------- > > Key: SSHD-966 > URL: https://issues.apache.org/jira/browse/SSHD-966 > Project: MINA SSHD > Issue Type: Bug > Affects Versions: 2.0.0 > Reporter: Francois Ferrand > Assignee: Lyor Goldstein > Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > We are using git-repo to download projects from Gerrit server, using SSH. > Gerrit is in version 2.16.16. which uses SSHD 2.0.0 and Mina 2.0.17 with NIO2 > backend. > One particularity of this setup is that git-repo creates a single control > master channel, and then downloads *lots* of Git repositories (500 > repositories, some of them relatively large), with some degree of > parallelism. This takes a long time, lots of data, and the multiplexed > connections are handled by gerrit in multiple threads. > In some cases, we experience a deadlock when an error happens at the end of > the key exchange, while sending pending packets: > {noformat} > Warning, the following threads are deadlocked : SSH git-upload-pack /project1 > (myuser), sshd-SshServer[df5f657]-nio2-thread-3 > "SSH git-upload-pack /project1 (myuser)" prio=1 BLOCKED > > org.apache.sshd.common.session.helpers.AbstractSession.writePacket(AbstractSession.java:1107) > > org.apache.sshd.common.channel.AbstractChannel.writePacket(AbstractChannel.java:798) > > org.apache.sshd.common.channel.ChannelOutputStream.flush(ChannelOutputStream.java:227) > > org.apache.sshd.common.channel.ChannelOutputStream.write(ChannelOutputStream.java:127) > > org.eclipse.jgit.transport.UploadPack$ResponseBufferedOutputStream.write(UploadPack.java:2183) > > org.eclipse.jgit.transport.SideBandOutputStream.writeBuffer(SideBandOutputStream.java:174) > > org.eclipse.jgit.transport.SideBandOutputStream.write(SideBandOutputStream.java:153) > > org.eclipse.jgit.internal.storage.pack.PackOutputStream.write(PackOutputStream.java:132) > > org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs2(PackFile.java:614) > > org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs(PackFile.java:433) > > org.eclipse.jgit.internal.storage.file.WindowCursor.copyObjectAsIs(WindowCursor.java:221) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjectImpl(PackWriter.java:1644) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObject(PackWriter.java:1621) > > org.eclipse.jgit.internal.storage.pack.PackOutputStream.writeObject(PackOutputStream.java:171) > > org.eclipse.jgit.internal.storage.file.WindowCursor.writeObjects(WindowCursor.java:229) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1609) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1597) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writePack(PackWriter.java:1154) > org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:2133) > org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:1947) > org.eclipse.jgit.transport.UploadPack.service(UploadPack.java:971) > org.eclipse.jgit.transport.UploadPack.upload(UploadPack.java:776) > com.google.gerrit.sshd.commands.Upload.runImpl(Upload.java:77) > > com.google.gerrit.sshd.AbstractGitCommand.service(AbstractGitCommand.java:98) > > com.google.gerrit.sshd.AbstractGitCommand.access$000(AbstractGitCommand.java:31) > > com.google.gerrit.sshd.AbstractGitCommand$1.run(AbstractGitCommand.java:63) > com.google.gerrit.sshd.BaseCommand$TaskThunk.run(BaseCommand.java:467) > > com.google.gerrit.server.logging.LoggingContextAwareRunnable.run(LoggingContextAwareRunnable.java:83) > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > com.google.gerrit.server.git.WorkQueue$Task.run(WorkQueue.java:646) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > java.lang.Thread.run(Thread.java:748) > "sshd-SshServer[df5f657]-nio2-thread-3" daemon prio=5 BLOCKED > > org.apache.sshd.common.channel.ChannelOutputStream.close(ChannelOutputStream.java:249) > org.apache.sshd.common.util.io.IoUtils.closeQuietly(IoUtils.java:151) > > org.apache.sshd.server.channel.ChannelSession.doCloseImmediately(ChannelSession.java:205) > > org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83) > > org.apache.sshd.common.channel.AbstractChannel.close(AbstractChannel.java:576) > > org.apache.sshd.common.util.closeable.ParallelCloseable.doClose(ParallelCloseable.java:65) > > org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63) > > org.apache.sshd.common.util.closeable.AbstractInnerCloseable.doCloseImmediately(AbstractInnerCloseable.java:46) > > org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83) > > org.apache.sshd.common.util.closeable.ParallelCloseable.doClose(ParallelCloseable.java:65) > > org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63) > > org.apache.sshd.common.util.closeable.SequentialCloseable$1.operationComplete(SequentialCloseable.java:56) > > org.apache.sshd.common.util.closeable.SequentialCloseable$1.operationComplete(SequentialCloseable.java:45) > > org.apache.sshd.common.util.closeable.SequentialCloseable.doClose(SequentialCloseable.java:69) > > org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63) > > org.apache.sshd.common.util.closeable.AbstractInnerCloseable.doCloseImmediately(AbstractInnerCloseable.java:46) > > org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83) > > org.apache.sshd.common.session.helpers.AbstractSession.exceptionCaught(AbstractSession.java:973) > > org.apache.sshd.common.session.helpers.AbstractSessionIoHandler.exceptionCaught(AbstractSessionIoHandler.java:53) > > org.apache.sshd.common.io.nio2.Nio2Session.exceptionCaught(Nio2Session.java:186) > > org.apache.sshd.common.io.nio2.Nio2Session.handleWriteCycleFailure(Nio2Session.java:460) > > org.apache.sshd.common.io.nio2.Nio2Session$2.onFailed(Nio2Session.java:415) > > org.apache.sshd.common.io.nio2.Nio2CompletionHandler.lambda$failed$1(Nio2CompletionHandler.java:46) > > org.apache.sshd.common.io.nio2.Nio2CompletionHandler$$Lambda$627/1874891543.run(Unknown > Source) > java.security.AccessController.doPrivileged(Native Method) > > org.apache.sshd.common.io.nio2.Nio2CompletionHandler.failed(Nio2CompletionHandler.java:45) > sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:128) > sun.nio.ch.Invoker.invokeDirect(Invoker.java:157) > > sun.nio.ch.UnixAsynchronousSocketChannelImpl.implWrite(UnixAsynchronousSocketChannelImpl.java:736) > > sun.nio.ch.AsynchronousSocketChannelImpl.write(AsynchronousSocketChannelImpl.java:382) > > sun.nio.ch.AsynchronousSocketChannelImpl.write(AsynchronousSocketChannelImpl.java:399) > > org.apache.sshd.common.io.nio2.Nio2Session.doWriteCycle(Nio2Session.java:401) > > org.apache.sshd.common.io.nio2.Nio2Session.startWriting(Nio2Session.java:386) > > org.apache.sshd.common.io.nio2.Nio2Session.writePacket(Nio2Session.java:169) > > org.apache.sshd.common.session.helpers.AbstractSession.doWritePacket(AbstractSession.java:1181) > > org.apache.sshd.common.session.helpers.AbstractSession.sendPendingPackets(AbstractSession.java:910) > > org.apache.sshd.common.session.helpers.AbstractSession.handleNewKeys(AbstractSession.java:875) > > org.apache.sshd.common.session.helpers.AbstractSession.doHandleMessage(AbstractSession.java:606) > > org.apache.sshd.common.session.helpers.AbstractSession.handleMessage(AbstractSession.java:555) > > org.apache.sshd.common.session.helpers.AbstractSession.decode(AbstractSession.java:1527) > > org.apache.sshd.common.session.helpers.AbstractSession.messageReceived(AbstractSession.java:516) > > org.apache.sshd.common.session.helpers.AbstractSessionIoHandler.messageReceived(AbstractSessionIoHandler.java:63) > > org.apache.sshd.common.io.nio2.Nio2Session.handleReadCycleCompletion(Nio2Session.java:339) > > org.apache.sshd.common.io.nio2.Nio2Session$1.onCompleted(Nio2Session.java:318) > > org.apache.sshd.common.io.nio2.Nio2Session$1.onCompleted(Nio2Session.java:315) > > org.apache.sshd.common.io.nio2.Nio2CompletionHandler.lambda$completed$0(Nio2CompletionHandler.java:38) > > org.apache.sshd.common.io.nio2.Nio2CompletionHandler$$Lambda$349/1690110070.run(Unknown > Source) > java.security.AccessController.doPrivileged(Native Method) > > org.apache.sshd.common.io.nio2.Nio2CompletionHandler.completed(Nio2CompletionHandler.java:37) > sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126) > sun.nio.ch.Invoker$2.run(Invoker.java:218) > > sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > java.lang.Thread.run(Thread.java:748) > "SSH git-upload-pack /project2 (myuser)" prio=1 BLOCKED > > org.apache.sshd.common.session.helpers.AbstractSession.writePacket(AbstractSession.java:1107) > > org.apache.sshd.common.channel.AbstractChannel.writePacket(AbstractChannel.java:798) > > org.apache.sshd.common.channel.ChannelOutputStream.flush(ChannelOutputStream.java:227) > > org.apache.sshd.common.channel.ChannelOutputStream.write(ChannelOutputStream.java:127) > > org.eclipse.jgit.transport.UploadPack$ResponseBufferedOutputStream.write(UploadPack.java:2183) > > org.eclipse.jgit.transport.SideBandOutputStream.writeBuffer(SideBandOutputStream.java:174) > > org.eclipse.jgit.transport.SideBandOutputStream.write(SideBandOutputStream.java:153) > > org.eclipse.jgit.internal.storage.pack.PackOutputStream.write(PackOutputStream.java:132) > > org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs2(PackFile.java:614) > > org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs(PackFile.java:433) > > org.eclipse.jgit.internal.storage.file.WindowCursor.copyObjectAsIs(WindowCursor.java:221) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjectImpl(PackWriter.java:1644) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObject(PackWriter.java:1621) > > org.eclipse.jgit.internal.storage.pack.PackOutputStream.writeObject(PackOutputStream.java:171) > > org.eclipse.jgit.internal.storage.file.WindowCursor.writeObjects(WindowCursor.java:229) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1609) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1597) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writePack(PackWriter.java:1154) > org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:2133) > org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:1947) > org.eclipse.jgit.transport.UploadPack.service(UploadPack.java:971) > org.eclipse.jgit.transport.UploadPack.upload(UploadPack.java:776) > com.google.gerrit.sshd.commands.Upload.runImpl(Upload.java:77) > > com.google.gerrit.sshd.AbstractGitCommand.service(AbstractGitCommand.java:98) > > com.google.gerrit.sshd.AbstractGitCommand.access$000(AbstractGitCommand.java:31) > > com.google.gerrit.sshd.AbstractGitCommand$1.run(AbstractGitCommand.java:63) > com.google.gerrit.sshd.BaseCommand$TaskThunk.run(BaseCommand.java:467) > > com.google.gerrit.server.logging.LoggingContextAwareRunnable.run(LoggingContextAwareRunnable.java:83) > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > com.google.gerrit.server.git.WorkQueue$Task.run(WorkQueue.java:646) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > java.lang.Thread.run(Thread.java:748) > {noformat} > In AbstractSession.handleNewKeys(), there is a lock on {{pendingEvents}}, > while trying to send these packets; when sending the packets fails, an > exception is generated, which causes the channel to get closed by calling > ChannelOutputStream.close(), which is a synchronized method. > At the same time, some other threads are trying to write data to the session: > it calls ChannelOutputStream.write(), which is also a synchronized method, > which calls AbstractSession.writePacket() which attempts to lock > {{pendingPackets}} to queue the packets. > *NOTE*: in our setup, we can reproduce this quite easily by increasing the > parallelism in git-repo, reducing the values of waitTimeout > (WAIT_FOR_SPACE_TIMEOUT), rekeyBytesLimit (REKEY_TIME_LIMIT) and > rekeyTimeLimit (REKEY_TIME_LIMIT) configuration options. -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@mina.apache.org For additional commands, e-mail: dev-h...@mina.apache.org