[
https://issues.apache.org/jira/browse/SSHD-966?focusedWorklogId=761510&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-761510
]
ASF GitHub Bot logged work on SSHD-966:
---------------------------------------
Author: ASF GitHub Bot
Created on: 24/Apr/22 20:59
Start Date: 24/Apr/22 20:59
Worklog Time Spent: 10m
Work Description: tomaswolf commented on code in PR #217:
URL: https://github.com/apache/mina-sshd/pull/217#discussion_r857178515
##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java:
##########
@@ -90,52 +93,174 @@ public byte getCommandType() {
return cmd;
}
- public void onWindowExpanded() throws IOException {
- doWriteIfPossible(true);
- }
-
+ /**
+ * {@inheritDoc}
+ *
+ * This write operation is <em>asynchronous</em>: if there is not enough
window space, it may keep the write pending
+ * or write only part of the buffer and keep the rest pending. Concurrent
writes are not allowed and will throw a
+ * {@link WritePendingException}. Any subsequent write <em>must</em> occur
only once the returned future is
+ * fulfilled; for instance triggered via a listener on the returned
future. Try to avoid doing a subsequent write
+ * directly in a future listener, though; doing so may lead to deep chains
of nested listener calls with deep stack
+ * traces, and may ultimately lead to a stack overflow.
+ *
+ * @throws WritePendingException if a concurrent write is attempted
+ */
@Override
- public synchronized IoWriteFuture writeBuffer(Buffer buffer) throws
IOException {
+ public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
if (isClosing()) {
- throw new EOFException("Closing: " + state);
+ throw new EOFException("Closing: " + writeState);
}
IoWriteFutureImpl future = new IoWriteFutureImpl(packetWriteId,
buffer);
- if (!pendingWrite.compareAndSet(null, future)) {
- throw new WritePendingException("A write operation is already
pending");
+ synchronized (writeState) {
+ if (writeState.isClosing()) { // Double check.
+ throw new EOFException("Closing: " + writeState);
+ }
+ if (writeState.writeInProgress) {
+ throw new WritePendingException("A write operation is already
pending");
+ }
+ writeState.lastWrite = future;
+ writeState.pendingWrite = future;
+ writeState.writeInProgress = true;
+ writeState.waitingOnIo = false;
}
doWriteIfPossible(false);
return future;
}
@Override
protected void preClose() {
- if (!(packetWriter instanceof Channel)) {
- try {
- packetWriter.close();
- } catch (IOException e) {
- error("preClose({}) Failed ({}) to pre-close packet writer:
{}",
- this, e.getClass().getSimpleName(), e.getMessage(), e);
+ synchronized (writeState) {
+ writeState.openState = state.get();
+ }
+ super.preClose();
+ }
+
+ @Override
+ protected void doCloseImmediately() {
+ try {
+ // Can't close this in preClose(); a graceful close waits for the
currently pending write to finish and thus
+ // still needs the packet writer.
+ if (!(packetWriter instanceof Channel)) {
+ try {
+ packetWriter.close();
+ } catch (IOException e) {
+ error("preClose({}) Failed ({}) to pre-close packet
writer: {}",
+ this, e.getClass().getSimpleName(),
e.getMessage(), e);
+ }
}
+ super.doCloseImmediately();
+ } finally {
+ shutdown();
}
+ }
- super.preClose();
+ private void shutdown() {
+ IoWriteFutureImpl current = null;
+ synchronized (writeState) {
+ writeState.openState = State.Closed;
+ current = writeState.pendingWrite;
+ writeState.pendingWrite = null;
+ writeState.waitingOnIo = false;
+ }
+ if (current != null) {
+ terminateFuture(current);
+ }
+ }
+
+ private void terminateFuture(IoWriteFutureImpl future) {
Review Comment:
Done.
Issue Time Tracking
-------------------
Worklog Id: (was: 761510)
Time Spent: 4h 20m (was: 4h 10m)
> Deadlock on disconnection at the end of key-exchange
> ----------------------------------------------------
>
> Key: SSHD-966
> URL: https://issues.apache.org/jira/browse/SSHD-966
> Project: MINA SSHD
> Issue Type: Bug
> Affects Versions: 2.0.0
> Reporter: Francois Ferrand
> Assignee: Lyor Goldstein
> Priority: Major
> Time Spent: 4h 20m
> Remaining Estimate: 0h
>
> We are using git-repo to download projects from Gerrit server, using SSH.
> Gerrit is in version 2.16.16. which uses SSHD 2.0.0 and Mina 2.0.17 with NIO2
> backend.
> One particularity of this setup is that git-repo creates a single control
> master channel, and then downloads *lots* of Git repositories (500
> repositories, some of them relatively large), with some degree of
> parallelism. This takes a long time, lots of data, and the multiplexed
> connections are handled by gerrit in multiple threads.
> In some cases, we experience a deadlock when an error happens at the end of
> the key exchange, while sending pending packets:
> {noformat}
> Warning, the following threads are deadlocked : SSH git-upload-pack /project1
> (myuser), sshd-SshServer[df5f657]-nio2-thread-3
> "SSH git-upload-pack /project1 (myuser)" prio=1 BLOCKED
>
> org.apache.sshd.common.session.helpers.AbstractSession.writePacket(AbstractSession.java:1107)
>
> org.apache.sshd.common.channel.AbstractChannel.writePacket(AbstractChannel.java:798)
>
> org.apache.sshd.common.channel.ChannelOutputStream.flush(ChannelOutputStream.java:227)
>
> org.apache.sshd.common.channel.ChannelOutputStream.write(ChannelOutputStream.java:127)
>
> org.eclipse.jgit.transport.UploadPack$ResponseBufferedOutputStream.write(UploadPack.java:2183)
>
> org.eclipse.jgit.transport.SideBandOutputStream.writeBuffer(SideBandOutputStream.java:174)
>
> org.eclipse.jgit.transport.SideBandOutputStream.write(SideBandOutputStream.java:153)
>
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.write(PackOutputStream.java:132)
>
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs2(PackFile.java:614)
>
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs(PackFile.java:433)
>
> org.eclipse.jgit.internal.storage.file.WindowCursor.copyObjectAsIs(WindowCursor.java:221)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjectImpl(PackWriter.java:1644)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObject(PackWriter.java:1621)
>
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.writeObject(PackOutputStream.java:171)
>
> org.eclipse.jgit.internal.storage.file.WindowCursor.writeObjects(WindowCursor.java:229)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1609)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1597)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writePack(PackWriter.java:1154)
> org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:2133)
> org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:1947)
> org.eclipse.jgit.transport.UploadPack.service(UploadPack.java:971)
> org.eclipse.jgit.transport.UploadPack.upload(UploadPack.java:776)
> com.google.gerrit.sshd.commands.Upload.runImpl(Upload.java:77)
>
> com.google.gerrit.sshd.AbstractGitCommand.service(AbstractGitCommand.java:98)
>
> com.google.gerrit.sshd.AbstractGitCommand.access$000(AbstractGitCommand.java:31)
>
> com.google.gerrit.sshd.AbstractGitCommand$1.run(AbstractGitCommand.java:63)
> com.google.gerrit.sshd.BaseCommand$TaskThunk.run(BaseCommand.java:467)
>
> com.google.gerrit.server.logging.LoggingContextAwareRunnable.run(LoggingContextAwareRunnable.java:83)
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> com.google.gerrit.server.git.WorkQueue$Task.run(WorkQueue.java:646)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
> "sshd-SshServer[df5f657]-nio2-thread-3" daemon prio=5 BLOCKED
>
> org.apache.sshd.common.channel.ChannelOutputStream.close(ChannelOutputStream.java:249)
> org.apache.sshd.common.util.io.IoUtils.closeQuietly(IoUtils.java:151)
>
> org.apache.sshd.server.channel.ChannelSession.doCloseImmediately(ChannelSession.java:205)
>
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>
> org.apache.sshd.common.channel.AbstractChannel.close(AbstractChannel.java:576)
>
> org.apache.sshd.common.util.closeable.ParallelCloseable.doClose(ParallelCloseable.java:65)
>
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>
> org.apache.sshd.common.util.closeable.AbstractInnerCloseable.doCloseImmediately(AbstractInnerCloseable.java:46)
>
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>
> org.apache.sshd.common.util.closeable.ParallelCloseable.doClose(ParallelCloseable.java:65)
>
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>
> org.apache.sshd.common.util.closeable.SequentialCloseable$1.operationComplete(SequentialCloseable.java:56)
>
> org.apache.sshd.common.util.closeable.SequentialCloseable$1.operationComplete(SequentialCloseable.java:45)
>
> org.apache.sshd.common.util.closeable.SequentialCloseable.doClose(SequentialCloseable.java:69)
>
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>
> org.apache.sshd.common.util.closeable.AbstractInnerCloseable.doCloseImmediately(AbstractInnerCloseable.java:46)
>
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>
> org.apache.sshd.common.session.helpers.AbstractSession.exceptionCaught(AbstractSession.java:973)
>
> org.apache.sshd.common.session.helpers.AbstractSessionIoHandler.exceptionCaught(AbstractSessionIoHandler.java:53)
>
> org.apache.sshd.common.io.nio2.Nio2Session.exceptionCaught(Nio2Session.java:186)
>
> org.apache.sshd.common.io.nio2.Nio2Session.handleWriteCycleFailure(Nio2Session.java:460)
>
> org.apache.sshd.common.io.nio2.Nio2Session$2.onFailed(Nio2Session.java:415)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.lambda$failed$1(Nio2CompletionHandler.java:46)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler$$Lambda$627/1874891543.run(Unknown
> Source)
> java.security.AccessController.doPrivileged(Native Method)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.failed(Nio2CompletionHandler.java:45)
> sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:128)
> sun.nio.ch.Invoker.invokeDirect(Invoker.java:157)
>
> sun.nio.ch.UnixAsynchronousSocketChannelImpl.implWrite(UnixAsynchronousSocketChannelImpl.java:736)
>
> sun.nio.ch.AsynchronousSocketChannelImpl.write(AsynchronousSocketChannelImpl.java:382)
>
> sun.nio.ch.AsynchronousSocketChannelImpl.write(AsynchronousSocketChannelImpl.java:399)
>
> org.apache.sshd.common.io.nio2.Nio2Session.doWriteCycle(Nio2Session.java:401)
>
> org.apache.sshd.common.io.nio2.Nio2Session.startWriting(Nio2Session.java:386)
>
> org.apache.sshd.common.io.nio2.Nio2Session.writePacket(Nio2Session.java:169)
>
> org.apache.sshd.common.session.helpers.AbstractSession.doWritePacket(AbstractSession.java:1181)
>
> org.apache.sshd.common.session.helpers.AbstractSession.sendPendingPackets(AbstractSession.java:910)
>
> org.apache.sshd.common.session.helpers.AbstractSession.handleNewKeys(AbstractSession.java:875)
>
> org.apache.sshd.common.session.helpers.AbstractSession.doHandleMessage(AbstractSession.java:606)
>
> org.apache.sshd.common.session.helpers.AbstractSession.handleMessage(AbstractSession.java:555)
>
> org.apache.sshd.common.session.helpers.AbstractSession.decode(AbstractSession.java:1527)
>
> org.apache.sshd.common.session.helpers.AbstractSession.messageReceived(AbstractSession.java:516)
>
> org.apache.sshd.common.session.helpers.AbstractSessionIoHandler.messageReceived(AbstractSessionIoHandler.java:63)
>
> org.apache.sshd.common.io.nio2.Nio2Session.handleReadCycleCompletion(Nio2Session.java:339)
>
> org.apache.sshd.common.io.nio2.Nio2Session$1.onCompleted(Nio2Session.java:318)
>
> org.apache.sshd.common.io.nio2.Nio2Session$1.onCompleted(Nio2Session.java:315)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.lambda$completed$0(Nio2CompletionHandler.java:38)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler$$Lambda$349/1690110070.run(Unknown
> Source)
> java.security.AccessController.doPrivileged(Native Method)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.completed(Nio2CompletionHandler.java:37)
> sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
> sun.nio.ch.Invoker$2.run(Invoker.java:218)
>
> sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
> "SSH git-upload-pack /project2 (myuser)" prio=1 BLOCKED
>
> org.apache.sshd.common.session.helpers.AbstractSession.writePacket(AbstractSession.java:1107)
>
> org.apache.sshd.common.channel.AbstractChannel.writePacket(AbstractChannel.java:798)
>
> org.apache.sshd.common.channel.ChannelOutputStream.flush(ChannelOutputStream.java:227)
>
> org.apache.sshd.common.channel.ChannelOutputStream.write(ChannelOutputStream.java:127)
>
> org.eclipse.jgit.transport.UploadPack$ResponseBufferedOutputStream.write(UploadPack.java:2183)
>
> org.eclipse.jgit.transport.SideBandOutputStream.writeBuffer(SideBandOutputStream.java:174)
>
> org.eclipse.jgit.transport.SideBandOutputStream.write(SideBandOutputStream.java:153)
>
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.write(PackOutputStream.java:132)
>
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs2(PackFile.java:614)
>
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs(PackFile.java:433)
>
> org.eclipse.jgit.internal.storage.file.WindowCursor.copyObjectAsIs(WindowCursor.java:221)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjectImpl(PackWriter.java:1644)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObject(PackWriter.java:1621)
>
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.writeObject(PackOutputStream.java:171)
>
> org.eclipse.jgit.internal.storage.file.WindowCursor.writeObjects(WindowCursor.java:229)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1609)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1597)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writePack(PackWriter.java:1154)
> org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:2133)
> org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:1947)
> org.eclipse.jgit.transport.UploadPack.service(UploadPack.java:971)
> org.eclipse.jgit.transport.UploadPack.upload(UploadPack.java:776)
> com.google.gerrit.sshd.commands.Upload.runImpl(Upload.java:77)
>
> com.google.gerrit.sshd.AbstractGitCommand.service(AbstractGitCommand.java:98)
>
> com.google.gerrit.sshd.AbstractGitCommand.access$000(AbstractGitCommand.java:31)
>
> com.google.gerrit.sshd.AbstractGitCommand$1.run(AbstractGitCommand.java:63)
> com.google.gerrit.sshd.BaseCommand$TaskThunk.run(BaseCommand.java:467)
>
> com.google.gerrit.server.logging.LoggingContextAwareRunnable.run(LoggingContextAwareRunnable.java:83)
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> com.google.gerrit.server.git.WorkQueue$Task.run(WorkQueue.java:646)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
> {noformat}
> In AbstractSession.handleNewKeys(), there is a lock on {{pendingEvents}},
> while trying to send these packets; when sending the packets fails, an
> exception is generated, which causes the channel to get closed by calling
> ChannelOutputStream.close(), which is a synchronized method.
> At the same time, some other threads are trying to write data to the session:
> it calls ChannelOutputStream.write(), which is also a synchronized method,
> which calls AbstractSession.writePacket() which attempts to lock
> {{pendingPackets}} to queue the packets.
> *NOTE*: in our setup, we can reproduce this quite easily by increasing the
> parallelism in git-repo, reducing the values of waitTimeout
> (WAIT_FOR_SPACE_TIMEOUT), rekeyBytesLimit (REKEY_TIME_LIMIT) and
> rekeyTimeLimit (REKEY_TIME_LIMIT) configuration options.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]