[ 
https://issues.apache.org/jira/browse/SSHD-966?focusedWorklogId=761516&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-761516
 ]

ASF GitHub Bot logged work on SSHD-966:
---------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Apr/22 21:04
            Start Date: 24/Apr/22 21:04
    Worklog Time Spent: 10m 
      Work Description: tomaswolf commented on code in PR #217:
URL: https://github.com/apache/mina-sshd/pull/217#discussion_r857179032


##########
sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.session.helpers;
+
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.security.GeneralSecurityException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.kex.KexState;
+import org.apache.sshd.common.util.ExceptionUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.slf4j.Logger;
+
+/**
+ * Manages SSH message sending during a key exchange. RFC 4253 specifies that 
during a key exchange, no high-level
+ * messages are to be sent, but a receiver must be able to deal with messages 
"in flight" until the peer's
+ * {@link SshConstants#SSH_MSG_KEX_INIT} message is received.
+ * <p>
+ * Apache MINA sshd queues up high-level messages that threads try to send 
while a key exchange is ongoing, and sends
+ * them once the key exchange is done. Sending queued messages may make the 
peer re-trigger a new key exchange, in which
+ * case sending queued messages stops and is resumed at the end of the new key 
exchange.
+ * </p>
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4253#section-7";>RFC 4253</a>
+ */
+public class KeyExchangeMessageHandler {
+
+    // With asynchronous flushing we get a classic producer-consumer problem. 
The flushing thread is the single
+    // consumer, and there is a risk that it might get overrun by the 
producers. The classical solution of using a
+    // LinkedBlockingQueue with a fixed maximum capacity doesn't work: we 
cannot make the producers block when the queue
+    // is full; we might deadlock or be unable to handle any incoming message.
+    //
+    // We need an unbounded queue that never blocks the producers, but that 
manages to throttle them such that the
+    // flushing thread can actually finish, and we still can handle incoming 
messages (in particular also the peer's
+    // SSH_MSG_NEW_KEYS, since we start flushing already after having sent our 
own SSH_MSG_NEW_KEYS).
+    //
+    // This is achieved by giving the flushing thread priority over the 
threads that might enqueue additional packets
+    // and flushing at least two packets at a time. Additionally the flush 
loop releases and shortly afterwards
+    // re-acquires the write lock, so normally not many readers (i.e., 
writePacket() calls) will get a chance to enqueue
+    // new packets.
+
+    /**
+     * We need the flushing thread to have priority over writing threads. So 
we use a lock that favors writers over
+     * readers, and any state updates and the flushing thread are writers, 
while writePacket() is a reader.
+     */
+    private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(false);

Review Comment:
   Done.



##########
sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.session.helpers;
+
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.security.GeneralSecurityException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.kex.KexState;
+import org.apache.sshd.common.util.ExceptionUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.slf4j.Logger;
+
+/**
+ * Manages SSH message sending during a key exchange. RFC 4253 specifies that 
during a key exchange, no high-level
+ * messages are to be sent, but a receiver must be able to deal with messages 
"in flight" until the peer's
+ * {@link SshConstants#SSH_MSG_KEX_INIT} message is received.
+ * <p>
+ * Apache MINA sshd queues up high-level messages that threads try to send 
while a key exchange is ongoing, and sends
+ * them once the key exchange is done. Sending queued messages may make the 
peer re-trigger a new key exchange, in which
+ * case sending queued messages stops and is resumed at the end of the new key 
exchange.
+ * </p>
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4253#section-7";>RFC 4253</a>
+ */
+public class KeyExchangeMessageHandler {
+
+    // With asynchronous flushing we get a classic producer-consumer problem. 
The flushing thread is the single
+    // consumer, and there is a risk that it might get overrun by the 
producers. The classical solution of using a
+    // LinkedBlockingQueue with a fixed maximum capacity doesn't work: we 
cannot make the producers block when the queue
+    // is full; we might deadlock or be unable to handle any incoming message.
+    //
+    // We need an unbounded queue that never blocks the producers, but that 
manages to throttle them such that the
+    // flushing thread can actually finish, and we still can handle incoming 
messages (in particular also the peer's
+    // SSH_MSG_NEW_KEYS, since we start flushing already after having sent our 
own SSH_MSG_NEW_KEYS).
+    //
+    // This is achieved by giving the flushing thread priority over the 
threads that might enqueue additional packets
+    // and flushing at least two packets at a time. Additionally the flush 
loop releases and shortly afterwards
+    // re-acquires the write lock, so normally not many readers (i.e., 
writePacket() calls) will get a chance to enqueue
+    // new packets.
+
+    /**
+     * We need the flushing thread to have priority over writing threads. So 
we use a lock that favors writers over
+     * readers, and any state updates and the flushing thread are writers, 
while writePacket() is a reader.
+     */
+    private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(false);
+
+    private final ExecutorService flushRunner = 
Executors.newSingleThreadExecutor();
+
+    private final AbstractSession session;
+
+    private final Logger log;
+
+    /**
+     * Queues up high-level packets written during an ongoing key exchange.
+     */
+    private final Queue<PendingWriteFuture> pendingPackets = new 
LinkedList<>();
+
+    /**
+     * Indicates that all pending packets have been flushed.
+     */
+    private boolean kexFlushed = true;
+
+    /**
+     * Never {@code null}. Used to block some threads when writing packets 
while pending packets are still being flushed
+     * at the end of a KEX to avoid overrunning the flushing thread. Always 
set, initially fulfilled. At the beginning
+     * of a KEX a new future is installed, which is fulfilled at the end of 
the KEX once there are no more pending
+     * packets to be flushed.
+     */
+    private DefaultKeyExchangeFuture kexFlushedFuture;
+
+    /**
+     * Creates a new {@link KeyExchangeMessageHandler} for the given {@code 
session}, using the given {@code Logger}.
+     *
+     * @param session {@link AbstractSession} the new instance belongs to
+     * @param log     {@link Logger} to use for writing log messages
+     */
+    public KeyExchangeMessageHandler(AbstractSession session, Logger log) {
+        this.session = Objects.requireNonNull(session);
+        this.log = Objects.requireNonNull(log);
+        // Start with a fulfilled kexFlushed future.
+        kexFlushedFuture = new DefaultKeyExchangeFuture(session.toString(), 
session.getFutureLock());
+        kexFlushedFuture.setValue(Boolean.TRUE);
+    }
+
+    public void updateState(Runnable update) {
+        lock.writeLock().lock();
+        try {
+            update.run();
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public <V> V updateState(Supplier<V> update) {
+        lock.writeLock().lock();
+        try {
+            return update.get();
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Initializes the state for a new key exchange. {@link 
#allPacketsFlushed()} will be {@code false}, and a new
+     * future to be fulfilled when all queued packets will be flushed once the 
key exchange is done is set. The
+     * currently set future from an earlier key exchange is returned. The 
returned future may or may not be fulfilled;
+     * if it isn't, there are still left-over pending packets to write from 
the previous key exchange, which will be
+     * written once the new key exchange flushes pending packets.
+     *
+     * @return the previous {@link DefaultKeyExchangeFuture} indicating 
whether all pending packets were flushed.
+     */
+    public DefaultKeyExchangeFuture initNewKeyExchange() {
+        return updateState(() -> {
+            kexFlushed = false;
+            DefaultKeyExchangeFuture oldFuture = kexFlushedFuture;
+            kexFlushedFuture = new 
DefaultKeyExchangeFuture(session.toString(), session.getFutureLock());
+            return oldFuture;
+        });
+    }
+
+    /**
+     * To be called when the key exchange is done. If there are any pending 
packets, returns a future that will be
+     * fulfilled when {@link #flushQueue(DefaultKeyExchangeFuture)} with that 
future as argument has flushed all pending
+     * packets, if there are any.
+     *
+     * @return the current {@link DefaultKeyExchangeFuture} and the number of 
currently pending packets
+     */
+    public SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture> 
terminateKeyExchange() {
+        return updateState(() -> {
+            int numPending = pendingPackets.size();
+            if (numPending == 0) {
+                kexFlushed = true;
+            }
+            return new SimpleImmutableEntry<Integer, 
DefaultKeyExchangeFuture>(Integer.valueOf(numPending), kexFlushedFuture);
+        });
+    }
+
+    /**
+     * Pretends all pending packets had been written. To be called when the 
{@link AbstractSession} closes.
+     */
+    public void shutdown() {
+        SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture> items = 
updateState(() -> {
+            kexFlushed = true;
+            return new SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture>(
+                    Integer.valueOf(pendingPackets.size()),
+                    kexFlushedFuture);
+        });
+        items.getValue().setValue(Boolean.valueOf(items.getKey().intValue() == 
0));
+        flushRunner.shutdownNow();
+    }
+
+    /**
+     * Writes a packet. If a key exchange is ongoing, only low-level messages 
are written directly; all other messages
+     * are queued and will be written once {@link 
#flushQueue(DefaultKeyExchangeFuture)} is called when the key exchange
+     * is done. Packets written while there are still pending packets to be 
flushed will either be queued, too, or the
+     * calling thread will be blocked with the given timeout until all packets 
have been flushed. Whether a write will
+     * be blocked is determined by {@link #isBlockAllowed(int)}.
+     * <p>
+     * If {@code timeout <= 0} or {@code unit == null}, a time-out of 
"forever" is assumed. Note that a timeout applies
+     * only if the calling thread is blocked.
+     * </p>
+     *
+     * @param  buffer      packet to write
+     * @param  timeout     number of {@link TimeUnit}s to wait at most if the 
calling thread is blocked
+     * @param  unit        {@link TimeUnit} of {@code timeout}
+     * @return             an {@link IoWriteFuture} that will be fulfilled 
once the packet has indeed been written.
+     * @throws IOException if an error occurs
+     */
+    public IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit 
unit) throws IOException {
+        // While exchanging key, queue high level packets.
+        byte[] bufData = buffer.array();
+        int cmd = bufData[buffer.rpos()] & 0xFF;
+        boolean enqueued = false;
+        boolean isLowLevelMessage = cmd <= SshConstants.SSH_MSG_KEX_LAST && 
cmd != SshConstants.SSH_MSG_SERVICE_REQUEST
+                && cmd != SshConstants.SSH_MSG_SERVICE_ACCEPT;
+        try {
+            if (isLowLevelMessage) {
+                // Low-level messages can always be sent.
+                return session.doWritePacket(buffer);
+            }
+            IoWriteFuture future = writeOrEnqueue(cmd, buffer, timeout, unit);
+            enqueued = future instanceof PendingWriteFuture;
+            return future;
+        } finally {
+            session.resetIdleTimeout();
+            if (!enqueued) {
+                try {
+                    session.checkRekey();
+                } catch (GeneralSecurityException e) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("writePacket({}) failed ({}) to check 
re-key: {}", session, e.getClass().getSimpleName(),
+                                e.getMessage(), e);
+                    }
+                    throw ValidateUtils.initializeExceptionCause(new 
ProtocolException(
+                            "Failed (" + e.getClass().getSimpleName() + ")" + 
" to check re-key necessity: " + e.getMessage()),
+                            e);
+                } catch (Exception e) {
+                    ExceptionUtils.rethrowAsIoException(e);
+                }
+            }
+        }
+    }
+
+    private IoWriteFuture writeOrEnqueue(int cmd, Buffer buffer, long timeout, 
TimeUnit unit) throws IOException {

Review Comment:
   Done.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 761516)
    Time Spent: 5h 20m  (was: 5h 10m)

> Deadlock on disconnection at the end of key-exchange
> ----------------------------------------------------
>
>                 Key: SSHD-966
>                 URL: https://issues.apache.org/jira/browse/SSHD-966
>             Project: MINA SSHD
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Francois Ferrand
>            Assignee: Lyor Goldstein
>            Priority: Major
>          Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> We are using git-repo to download projects from Gerrit server, using SSH.
> Gerrit is in version 2.16.16. which uses SSHD 2.0.0 and Mina 2.0.17 with NIO2 
> backend.
> One particularity of this setup is that git-repo creates a single control 
> master channel, and then downloads *lots* of Git repositories (500 
> repositories, some of them relatively large), with some degree of 
> parallelism. This takes a long time, lots of data, and the multiplexed 
> connections are handled by gerrit in multiple threads.
> In some cases, we experience a deadlock when an error happens at the end of 
> the key exchange, while sending pending packets:
> {noformat}
> Warning, the following threads are deadlocked : SSH git-upload-pack /project1 
> (myuser), sshd-SshServer[df5f657]-nio2-thread-3
> "SSH git-upload-pack /project1 (myuser)" prio=1 BLOCKED
>       
> org.apache.sshd.common.session.helpers.AbstractSession.writePacket(AbstractSession.java:1107)
>       
> org.apache.sshd.common.channel.AbstractChannel.writePacket(AbstractChannel.java:798)
>       
> org.apache.sshd.common.channel.ChannelOutputStream.flush(ChannelOutputStream.java:227)
>       
> org.apache.sshd.common.channel.ChannelOutputStream.write(ChannelOutputStream.java:127)
>       
> org.eclipse.jgit.transport.UploadPack$ResponseBufferedOutputStream.write(UploadPack.java:2183)
>       
> org.eclipse.jgit.transport.SideBandOutputStream.writeBuffer(SideBandOutputStream.java:174)
>       
> org.eclipse.jgit.transport.SideBandOutputStream.write(SideBandOutputStream.java:153)
>       
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.write(PackOutputStream.java:132)
>       
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs2(PackFile.java:614)
>       
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs(PackFile.java:433)
>       
> org.eclipse.jgit.internal.storage.file.WindowCursor.copyObjectAsIs(WindowCursor.java:221)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjectImpl(PackWriter.java:1644)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObject(PackWriter.java:1621)
>       
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.writeObject(PackOutputStream.java:171)
>       
> org.eclipse.jgit.internal.storage.file.WindowCursor.writeObjects(WindowCursor.java:229)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1609)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1597)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writePack(PackWriter.java:1154)
>       org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:2133)
>       org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:1947)
>       org.eclipse.jgit.transport.UploadPack.service(UploadPack.java:971)
>       org.eclipse.jgit.transport.UploadPack.upload(UploadPack.java:776)
>       com.google.gerrit.sshd.commands.Upload.runImpl(Upload.java:77)
>       
> com.google.gerrit.sshd.AbstractGitCommand.service(AbstractGitCommand.java:98)
>       
> com.google.gerrit.sshd.AbstractGitCommand.access$000(AbstractGitCommand.java:31)
>       
> com.google.gerrit.sshd.AbstractGitCommand$1.run(AbstractGitCommand.java:63)
>       com.google.gerrit.sshd.BaseCommand$TaskThunk.run(BaseCommand.java:467)
>       
> com.google.gerrit.server.logging.LoggingContextAwareRunnable.run(LoggingContextAwareRunnable.java:83)
>       java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       com.google.gerrit.server.git.WorkQueue$Task.run(WorkQueue.java:646)
>       
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       java.lang.Thread.run(Thread.java:748)
> "sshd-SshServer[df5f657]-nio2-thread-3" daemon prio=5 BLOCKED
>       
> org.apache.sshd.common.channel.ChannelOutputStream.close(ChannelOutputStream.java:249)
>       org.apache.sshd.common.util.io.IoUtils.closeQuietly(IoUtils.java:151)
>       
> org.apache.sshd.server.channel.ChannelSession.doCloseImmediately(ChannelSession.java:205)
>       
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>       
> org.apache.sshd.common.channel.AbstractChannel.close(AbstractChannel.java:576)
>       
> org.apache.sshd.common.util.closeable.ParallelCloseable.doClose(ParallelCloseable.java:65)
>       
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>       
> org.apache.sshd.common.util.closeable.AbstractInnerCloseable.doCloseImmediately(AbstractInnerCloseable.java:46)
>       
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>       
> org.apache.sshd.common.util.closeable.ParallelCloseable.doClose(ParallelCloseable.java:65)
>       
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>       
> org.apache.sshd.common.util.closeable.SequentialCloseable$1.operationComplete(SequentialCloseable.java:56)
>       
> org.apache.sshd.common.util.closeable.SequentialCloseable$1.operationComplete(SequentialCloseable.java:45)
>       
> org.apache.sshd.common.util.closeable.SequentialCloseable.doClose(SequentialCloseable.java:69)
>       
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>       
> org.apache.sshd.common.util.closeable.AbstractInnerCloseable.doCloseImmediately(AbstractInnerCloseable.java:46)
>       
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.exceptionCaught(AbstractSession.java:973)
>       
> org.apache.sshd.common.session.helpers.AbstractSessionIoHandler.exceptionCaught(AbstractSessionIoHandler.java:53)
>       
> org.apache.sshd.common.io.nio2.Nio2Session.exceptionCaught(Nio2Session.java:186)
>       
> org.apache.sshd.common.io.nio2.Nio2Session.handleWriteCycleFailure(Nio2Session.java:460)
>       
> org.apache.sshd.common.io.nio2.Nio2Session$2.onFailed(Nio2Session.java:415)
>       
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.lambda$failed$1(Nio2CompletionHandler.java:46)
>       
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler$$Lambda$627/1874891543.run(Unknown
>  Source)
>       java.security.AccessController.doPrivileged(Native Method)
>       
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.failed(Nio2CompletionHandler.java:45)
>       sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:128)
>       sun.nio.ch.Invoker.invokeDirect(Invoker.java:157)
>       
> sun.nio.ch.UnixAsynchronousSocketChannelImpl.implWrite(UnixAsynchronousSocketChannelImpl.java:736)
>       
> sun.nio.ch.AsynchronousSocketChannelImpl.write(AsynchronousSocketChannelImpl.java:382)
>       
> sun.nio.ch.AsynchronousSocketChannelImpl.write(AsynchronousSocketChannelImpl.java:399)
>       
> org.apache.sshd.common.io.nio2.Nio2Session.doWriteCycle(Nio2Session.java:401)
>       
> org.apache.sshd.common.io.nio2.Nio2Session.startWriting(Nio2Session.java:386)
>       
> org.apache.sshd.common.io.nio2.Nio2Session.writePacket(Nio2Session.java:169)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.doWritePacket(AbstractSession.java:1181)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.sendPendingPackets(AbstractSession.java:910)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.handleNewKeys(AbstractSession.java:875)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.doHandleMessage(AbstractSession.java:606)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.handleMessage(AbstractSession.java:555)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.decode(AbstractSession.java:1527)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.messageReceived(AbstractSession.java:516)
>       
> org.apache.sshd.common.session.helpers.AbstractSessionIoHandler.messageReceived(AbstractSessionIoHandler.java:63)
>       
> org.apache.sshd.common.io.nio2.Nio2Session.handleReadCycleCompletion(Nio2Session.java:339)
>       
> org.apache.sshd.common.io.nio2.Nio2Session$1.onCompleted(Nio2Session.java:318)
>       
> org.apache.sshd.common.io.nio2.Nio2Session$1.onCompleted(Nio2Session.java:315)
>       
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.lambda$completed$0(Nio2CompletionHandler.java:38)
>       
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler$$Lambda$349/1690110070.run(Unknown
>  Source)
>       java.security.AccessController.doPrivileged(Native Method)
>       
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.completed(Nio2CompletionHandler.java:37)
>       sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
>       sun.nio.ch.Invoker$2.run(Invoker.java:218)
>       
> sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
>       
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       java.lang.Thread.run(Thread.java:748)
> "SSH git-upload-pack /project2 (myuser)" prio=1 BLOCKED
>       
> org.apache.sshd.common.session.helpers.AbstractSession.writePacket(AbstractSession.java:1107)
>       
> org.apache.sshd.common.channel.AbstractChannel.writePacket(AbstractChannel.java:798)
>       
> org.apache.sshd.common.channel.ChannelOutputStream.flush(ChannelOutputStream.java:227)
>       
> org.apache.sshd.common.channel.ChannelOutputStream.write(ChannelOutputStream.java:127)
>       
> org.eclipse.jgit.transport.UploadPack$ResponseBufferedOutputStream.write(UploadPack.java:2183)
>       
> org.eclipse.jgit.transport.SideBandOutputStream.writeBuffer(SideBandOutputStream.java:174)
>       
> org.eclipse.jgit.transport.SideBandOutputStream.write(SideBandOutputStream.java:153)
>       
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.write(PackOutputStream.java:132)
>       
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs2(PackFile.java:614)
>       
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs(PackFile.java:433)
>       
> org.eclipse.jgit.internal.storage.file.WindowCursor.copyObjectAsIs(WindowCursor.java:221)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjectImpl(PackWriter.java:1644)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObject(PackWriter.java:1621)
>       
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.writeObject(PackOutputStream.java:171)
>       
> org.eclipse.jgit.internal.storage.file.WindowCursor.writeObjects(WindowCursor.java:229)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1609)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1597)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writePack(PackWriter.java:1154)
>       org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:2133)
>       org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:1947)
>       org.eclipse.jgit.transport.UploadPack.service(UploadPack.java:971)
>       org.eclipse.jgit.transport.UploadPack.upload(UploadPack.java:776)
>       com.google.gerrit.sshd.commands.Upload.runImpl(Upload.java:77)
>       
> com.google.gerrit.sshd.AbstractGitCommand.service(AbstractGitCommand.java:98)
>       
> com.google.gerrit.sshd.AbstractGitCommand.access$000(AbstractGitCommand.java:31)
>       
> com.google.gerrit.sshd.AbstractGitCommand$1.run(AbstractGitCommand.java:63)
>       com.google.gerrit.sshd.BaseCommand$TaskThunk.run(BaseCommand.java:467)
>       
> com.google.gerrit.server.logging.LoggingContextAwareRunnable.run(LoggingContextAwareRunnable.java:83)
>       java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       com.google.gerrit.server.git.WorkQueue$Task.run(WorkQueue.java:646)
>       
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       java.lang.Thread.run(Thread.java:748)
> {noformat}
> In AbstractSession.handleNewKeys(), there is a lock on {{pendingEvents}}, 
> while trying to send these packets; when sending the packets fails, an 
> exception is generated, which causes the channel to get closed by calling 
> ChannelOutputStream.close(), which is a synchronized method.
> At the same time, some other threads are trying to write data to the session: 
> it calls ChannelOutputStream.write(), which is also a synchronized method, 
> which calls AbstractSession.writePacket() which attempts to lock 
> {{pendingPackets}} to queue the packets.
> *NOTE*: in our setup, we can reproduce this quite easily by increasing the 
> parallelism in git-repo, reducing the values of waitTimeout 
> (WAIT_FOR_SPACE_TIMEOUT), rekeyBytesLimit (REKEY_TIME_LIMIT) and 
> rekeyTimeLimit (REKEY_TIME_LIMIT) configuration options.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@mina.apache.org
For additional commands, e-mail: dev-h...@mina.apache.org

Reply via email to