[ https://issues.apache.org/jira/browse/SSHD-966?focusedWorklogId=761516&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-761516 ]
ASF GitHub Bot logged work on SSHD-966: --------------------------------------- Author: ASF GitHub Bot Created on: 24/Apr/22 21:04 Start Date: 24/Apr/22 21:04 Worklog Time Spent: 10m Work Description: tomaswolf commented on code in PR #217: URL: https://github.com/apache/mina-sshd/pull/217#discussion_r857179032 ########## sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java: ########## @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.common.session.helpers; + +import java.io.IOException; +import java.net.ProtocolException; +import java.security.GeneralSecurityException; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +import org.apache.sshd.common.SshConstants; +import org.apache.sshd.common.future.DefaultKeyExchangeFuture; +import org.apache.sshd.common.io.IoWriteFuture; +import org.apache.sshd.common.kex.KexState; +import org.apache.sshd.common.util.ExceptionUtils; +import org.apache.sshd.common.util.ValidateUtils; +import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.common.util.threads.ThreadUtils; +import org.slf4j.Logger; + +/** + * Manages SSH message sending during a key exchange. RFC 4253 specifies that during a key exchange, no high-level + * messages are to be sent, but a receiver must be able to deal with messages "in flight" until the peer's + * {@link SshConstants#SSH_MSG_KEX_INIT} message is received. + * <p> + * Apache MINA sshd queues up high-level messages that threads try to send while a key exchange is ongoing, and sends + * them once the key exchange is done. Sending queued messages may make the peer re-trigger a new key exchange, in which + * case sending queued messages stops and is resumed at the end of the new key exchange. + * </p> + * + * @see <a href="https://tools.ietf.org/html/rfc4253#section-7">RFC 4253</a> + */ +public class KeyExchangeMessageHandler { + + // With asynchronous flushing we get a classic producer-consumer problem. The flushing thread is the single + // consumer, and there is a risk that it might get overrun by the producers. The classical solution of using a + // LinkedBlockingQueue with a fixed maximum capacity doesn't work: we cannot make the producers block when the queue + // is full; we might deadlock or be unable to handle any incoming message. + // + // We need an unbounded queue that never blocks the producers, but that manages to throttle them such that the + // flushing thread can actually finish, and we still can handle incoming messages (in particular also the peer's + // SSH_MSG_NEW_KEYS, since we start flushing already after having sent our own SSH_MSG_NEW_KEYS). + // + // This is achieved by giving the flushing thread priority over the threads that might enqueue additional packets + // and flushing at least two packets at a time. Additionally the flush loop releases and shortly afterwards + // re-acquires the write lock, so normally not many readers (i.e., writePacket() calls) will get a chance to enqueue + // new packets. + + /** + * We need the flushing thread to have priority over writing threads. So we use a lock that favors writers over + * readers, and any state updates and the flushing thread are writers, while writePacket() is a reader. + */ + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); Review Comment: Done. ########## sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java: ########## @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.common.session.helpers; + +import java.io.IOException; +import java.net.ProtocolException; +import java.security.GeneralSecurityException; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +import org.apache.sshd.common.SshConstants; +import org.apache.sshd.common.future.DefaultKeyExchangeFuture; +import org.apache.sshd.common.io.IoWriteFuture; +import org.apache.sshd.common.kex.KexState; +import org.apache.sshd.common.util.ExceptionUtils; +import org.apache.sshd.common.util.ValidateUtils; +import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.common.util.threads.ThreadUtils; +import org.slf4j.Logger; + +/** + * Manages SSH message sending during a key exchange. RFC 4253 specifies that during a key exchange, no high-level + * messages are to be sent, but a receiver must be able to deal with messages "in flight" until the peer's + * {@link SshConstants#SSH_MSG_KEX_INIT} message is received. + * <p> + * Apache MINA sshd queues up high-level messages that threads try to send while a key exchange is ongoing, and sends + * them once the key exchange is done. Sending queued messages may make the peer re-trigger a new key exchange, in which + * case sending queued messages stops and is resumed at the end of the new key exchange. + * </p> + * + * @see <a href="https://tools.ietf.org/html/rfc4253#section-7">RFC 4253</a> + */ +public class KeyExchangeMessageHandler { + + // With asynchronous flushing we get a classic producer-consumer problem. The flushing thread is the single + // consumer, and there is a risk that it might get overrun by the producers. The classical solution of using a + // LinkedBlockingQueue with a fixed maximum capacity doesn't work: we cannot make the producers block when the queue + // is full; we might deadlock or be unable to handle any incoming message. + // + // We need an unbounded queue that never blocks the producers, but that manages to throttle them such that the + // flushing thread can actually finish, and we still can handle incoming messages (in particular also the peer's + // SSH_MSG_NEW_KEYS, since we start flushing already after having sent our own SSH_MSG_NEW_KEYS). + // + // This is achieved by giving the flushing thread priority over the threads that might enqueue additional packets + // and flushing at least two packets at a time. Additionally the flush loop releases and shortly afterwards + // re-acquires the write lock, so normally not many readers (i.e., writePacket() calls) will get a chance to enqueue + // new packets. + + /** + * We need the flushing thread to have priority over writing threads. So we use a lock that favors writers over + * readers, and any state updates and the flushing thread are writers, while writePacket() is a reader. + */ + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); + + private final ExecutorService flushRunner = Executors.newSingleThreadExecutor(); + + private final AbstractSession session; + + private final Logger log; + + /** + * Queues up high-level packets written during an ongoing key exchange. + */ + private final Queue<PendingWriteFuture> pendingPackets = new LinkedList<>(); + + /** + * Indicates that all pending packets have been flushed. + */ + private boolean kexFlushed = true; + + /** + * Never {@code null}. Used to block some threads when writing packets while pending packets are still being flushed + * at the end of a KEX to avoid overrunning the flushing thread. Always set, initially fulfilled. At the beginning + * of a KEX a new future is installed, which is fulfilled at the end of the KEX once there are no more pending + * packets to be flushed. + */ + private DefaultKeyExchangeFuture kexFlushedFuture; + + /** + * Creates a new {@link KeyExchangeMessageHandler} for the given {@code session}, using the given {@code Logger}. + * + * @param session {@link AbstractSession} the new instance belongs to + * @param log {@link Logger} to use for writing log messages + */ + public KeyExchangeMessageHandler(AbstractSession session, Logger log) { + this.session = Objects.requireNonNull(session); + this.log = Objects.requireNonNull(log); + // Start with a fulfilled kexFlushed future. + kexFlushedFuture = new DefaultKeyExchangeFuture(session.toString(), session.getFutureLock()); + kexFlushedFuture.setValue(Boolean.TRUE); + } + + public void updateState(Runnable update) { + lock.writeLock().lock(); + try { + update.run(); + } finally { + lock.writeLock().unlock(); + } + } + + public <V> V updateState(Supplier<V> update) { + lock.writeLock().lock(); + try { + return update.get(); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Initializes the state for a new key exchange. {@link #allPacketsFlushed()} will be {@code false}, and a new + * future to be fulfilled when all queued packets will be flushed once the key exchange is done is set. The + * currently set future from an earlier key exchange is returned. The returned future may or may not be fulfilled; + * if it isn't, there are still left-over pending packets to write from the previous key exchange, which will be + * written once the new key exchange flushes pending packets. + * + * @return the previous {@link DefaultKeyExchangeFuture} indicating whether all pending packets were flushed. + */ + public DefaultKeyExchangeFuture initNewKeyExchange() { + return updateState(() -> { + kexFlushed = false; + DefaultKeyExchangeFuture oldFuture = kexFlushedFuture; + kexFlushedFuture = new DefaultKeyExchangeFuture(session.toString(), session.getFutureLock()); + return oldFuture; + }); + } + + /** + * To be called when the key exchange is done. If there are any pending packets, returns a future that will be + * fulfilled when {@link #flushQueue(DefaultKeyExchangeFuture)} with that future as argument has flushed all pending + * packets, if there are any. + * + * @return the current {@link DefaultKeyExchangeFuture} and the number of currently pending packets + */ + public SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture> terminateKeyExchange() { + return updateState(() -> { + int numPending = pendingPackets.size(); + if (numPending == 0) { + kexFlushed = true; + } + return new SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture>(Integer.valueOf(numPending), kexFlushedFuture); + }); + } + + /** + * Pretends all pending packets had been written. To be called when the {@link AbstractSession} closes. + */ + public void shutdown() { + SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture> items = updateState(() -> { + kexFlushed = true; + return new SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture>( + Integer.valueOf(pendingPackets.size()), + kexFlushedFuture); + }); + items.getValue().setValue(Boolean.valueOf(items.getKey().intValue() == 0)); + flushRunner.shutdownNow(); + } + + /** + * Writes a packet. If a key exchange is ongoing, only low-level messages are written directly; all other messages + * are queued and will be written once {@link #flushQueue(DefaultKeyExchangeFuture)} is called when the key exchange + * is done. Packets written while there are still pending packets to be flushed will either be queued, too, or the + * calling thread will be blocked with the given timeout until all packets have been flushed. Whether a write will + * be blocked is determined by {@link #isBlockAllowed(int)}. + * <p> + * If {@code timeout <= 0} or {@code unit == null}, a time-out of "forever" is assumed. Note that a timeout applies + * only if the calling thread is blocked. + * </p> + * + * @param buffer packet to write + * @param timeout number of {@link TimeUnit}s to wait at most if the calling thread is blocked + * @param unit {@link TimeUnit} of {@code timeout} + * @return an {@link IoWriteFuture} that will be fulfilled once the packet has indeed been written. + * @throws IOException if an error occurs + */ + public IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit unit) throws IOException { + // While exchanging key, queue high level packets. + byte[] bufData = buffer.array(); + int cmd = bufData[buffer.rpos()] & 0xFF; + boolean enqueued = false; + boolean isLowLevelMessage = cmd <= SshConstants.SSH_MSG_KEX_LAST && cmd != SshConstants.SSH_MSG_SERVICE_REQUEST + && cmd != SshConstants.SSH_MSG_SERVICE_ACCEPT; + try { + if (isLowLevelMessage) { + // Low-level messages can always be sent. + return session.doWritePacket(buffer); + } + IoWriteFuture future = writeOrEnqueue(cmd, buffer, timeout, unit); + enqueued = future instanceof PendingWriteFuture; + return future; + } finally { + session.resetIdleTimeout(); + if (!enqueued) { + try { + session.checkRekey(); + } catch (GeneralSecurityException e) { + if (log.isDebugEnabled()) { + log.debug("writePacket({}) failed ({}) to check re-key: {}", session, e.getClass().getSimpleName(), + e.getMessage(), e); + } + throw ValidateUtils.initializeExceptionCause(new ProtocolException( + "Failed (" + e.getClass().getSimpleName() + ")" + " to check re-key necessity: " + e.getMessage()), + e); + } catch (Exception e) { + ExceptionUtils.rethrowAsIoException(e); + } + } + } + } + + private IoWriteFuture writeOrEnqueue(int cmd, Buffer buffer, long timeout, TimeUnit unit) throws IOException { Review Comment: Done. Issue Time Tracking ------------------- Worklog Id: (was: 761516) Time Spent: 5h 20m (was: 5h 10m) > Deadlock on disconnection at the end of key-exchange > ---------------------------------------------------- > > Key: SSHD-966 > URL: https://issues.apache.org/jira/browse/SSHD-966 > Project: MINA SSHD > Issue Type: Bug > Affects Versions: 2.0.0 > Reporter: Francois Ferrand > Assignee: Lyor Goldstein > Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > We are using git-repo to download projects from Gerrit server, using SSH. > Gerrit is in version 2.16.16. which uses SSHD 2.0.0 and Mina 2.0.17 with NIO2 > backend. > One particularity of this setup is that git-repo creates a single control > master channel, and then downloads *lots* of Git repositories (500 > repositories, some of them relatively large), with some degree of > parallelism. This takes a long time, lots of data, and the multiplexed > connections are handled by gerrit in multiple threads. > In some cases, we experience a deadlock when an error happens at the end of > the key exchange, while sending pending packets: > {noformat} > Warning, the following threads are deadlocked : SSH git-upload-pack /project1 > (myuser), sshd-SshServer[df5f657]-nio2-thread-3 > "SSH git-upload-pack /project1 (myuser)" prio=1 BLOCKED > > org.apache.sshd.common.session.helpers.AbstractSession.writePacket(AbstractSession.java:1107) > > org.apache.sshd.common.channel.AbstractChannel.writePacket(AbstractChannel.java:798) > > org.apache.sshd.common.channel.ChannelOutputStream.flush(ChannelOutputStream.java:227) > > org.apache.sshd.common.channel.ChannelOutputStream.write(ChannelOutputStream.java:127) > > org.eclipse.jgit.transport.UploadPack$ResponseBufferedOutputStream.write(UploadPack.java:2183) > > org.eclipse.jgit.transport.SideBandOutputStream.writeBuffer(SideBandOutputStream.java:174) > > org.eclipse.jgit.transport.SideBandOutputStream.write(SideBandOutputStream.java:153) > > org.eclipse.jgit.internal.storage.pack.PackOutputStream.write(PackOutputStream.java:132) > > org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs2(PackFile.java:614) > > org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs(PackFile.java:433) > > org.eclipse.jgit.internal.storage.file.WindowCursor.copyObjectAsIs(WindowCursor.java:221) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjectImpl(PackWriter.java:1644) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObject(PackWriter.java:1621) > > org.eclipse.jgit.internal.storage.pack.PackOutputStream.writeObject(PackOutputStream.java:171) > > org.eclipse.jgit.internal.storage.file.WindowCursor.writeObjects(WindowCursor.java:229) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1609) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1597) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writePack(PackWriter.java:1154) > org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:2133) > org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:1947) > org.eclipse.jgit.transport.UploadPack.service(UploadPack.java:971) > org.eclipse.jgit.transport.UploadPack.upload(UploadPack.java:776) > com.google.gerrit.sshd.commands.Upload.runImpl(Upload.java:77) > > com.google.gerrit.sshd.AbstractGitCommand.service(AbstractGitCommand.java:98) > > com.google.gerrit.sshd.AbstractGitCommand.access$000(AbstractGitCommand.java:31) > > com.google.gerrit.sshd.AbstractGitCommand$1.run(AbstractGitCommand.java:63) > com.google.gerrit.sshd.BaseCommand$TaskThunk.run(BaseCommand.java:467) > > com.google.gerrit.server.logging.LoggingContextAwareRunnable.run(LoggingContextAwareRunnable.java:83) > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > com.google.gerrit.server.git.WorkQueue$Task.run(WorkQueue.java:646) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > java.lang.Thread.run(Thread.java:748) > "sshd-SshServer[df5f657]-nio2-thread-3" daemon prio=5 BLOCKED > > org.apache.sshd.common.channel.ChannelOutputStream.close(ChannelOutputStream.java:249) > org.apache.sshd.common.util.io.IoUtils.closeQuietly(IoUtils.java:151) > > org.apache.sshd.server.channel.ChannelSession.doCloseImmediately(ChannelSession.java:205) > > org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83) > > org.apache.sshd.common.channel.AbstractChannel.close(AbstractChannel.java:576) > > org.apache.sshd.common.util.closeable.ParallelCloseable.doClose(ParallelCloseable.java:65) > > org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63) > > org.apache.sshd.common.util.closeable.AbstractInnerCloseable.doCloseImmediately(AbstractInnerCloseable.java:46) > > org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83) > > org.apache.sshd.common.util.closeable.ParallelCloseable.doClose(ParallelCloseable.java:65) > > org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63) > > org.apache.sshd.common.util.closeable.SequentialCloseable$1.operationComplete(SequentialCloseable.java:56) > > org.apache.sshd.common.util.closeable.SequentialCloseable$1.operationComplete(SequentialCloseable.java:45) > > org.apache.sshd.common.util.closeable.SequentialCloseable.doClose(SequentialCloseable.java:69) > > org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63) > > org.apache.sshd.common.util.closeable.AbstractInnerCloseable.doCloseImmediately(AbstractInnerCloseable.java:46) > > org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83) > > org.apache.sshd.common.session.helpers.AbstractSession.exceptionCaught(AbstractSession.java:973) > > org.apache.sshd.common.session.helpers.AbstractSessionIoHandler.exceptionCaught(AbstractSessionIoHandler.java:53) > > org.apache.sshd.common.io.nio2.Nio2Session.exceptionCaught(Nio2Session.java:186) > > org.apache.sshd.common.io.nio2.Nio2Session.handleWriteCycleFailure(Nio2Session.java:460) > > org.apache.sshd.common.io.nio2.Nio2Session$2.onFailed(Nio2Session.java:415) > > org.apache.sshd.common.io.nio2.Nio2CompletionHandler.lambda$failed$1(Nio2CompletionHandler.java:46) > > org.apache.sshd.common.io.nio2.Nio2CompletionHandler$$Lambda$627/1874891543.run(Unknown > Source) > java.security.AccessController.doPrivileged(Native Method) > > org.apache.sshd.common.io.nio2.Nio2CompletionHandler.failed(Nio2CompletionHandler.java:45) > sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:128) > sun.nio.ch.Invoker.invokeDirect(Invoker.java:157) > > sun.nio.ch.UnixAsynchronousSocketChannelImpl.implWrite(UnixAsynchronousSocketChannelImpl.java:736) > > sun.nio.ch.AsynchronousSocketChannelImpl.write(AsynchronousSocketChannelImpl.java:382) > > sun.nio.ch.AsynchronousSocketChannelImpl.write(AsynchronousSocketChannelImpl.java:399) > > org.apache.sshd.common.io.nio2.Nio2Session.doWriteCycle(Nio2Session.java:401) > > org.apache.sshd.common.io.nio2.Nio2Session.startWriting(Nio2Session.java:386) > > org.apache.sshd.common.io.nio2.Nio2Session.writePacket(Nio2Session.java:169) > > org.apache.sshd.common.session.helpers.AbstractSession.doWritePacket(AbstractSession.java:1181) > > org.apache.sshd.common.session.helpers.AbstractSession.sendPendingPackets(AbstractSession.java:910) > > org.apache.sshd.common.session.helpers.AbstractSession.handleNewKeys(AbstractSession.java:875) > > org.apache.sshd.common.session.helpers.AbstractSession.doHandleMessage(AbstractSession.java:606) > > org.apache.sshd.common.session.helpers.AbstractSession.handleMessage(AbstractSession.java:555) > > org.apache.sshd.common.session.helpers.AbstractSession.decode(AbstractSession.java:1527) > > org.apache.sshd.common.session.helpers.AbstractSession.messageReceived(AbstractSession.java:516) > > org.apache.sshd.common.session.helpers.AbstractSessionIoHandler.messageReceived(AbstractSessionIoHandler.java:63) > > org.apache.sshd.common.io.nio2.Nio2Session.handleReadCycleCompletion(Nio2Session.java:339) > > org.apache.sshd.common.io.nio2.Nio2Session$1.onCompleted(Nio2Session.java:318) > > org.apache.sshd.common.io.nio2.Nio2Session$1.onCompleted(Nio2Session.java:315) > > org.apache.sshd.common.io.nio2.Nio2CompletionHandler.lambda$completed$0(Nio2CompletionHandler.java:38) > > org.apache.sshd.common.io.nio2.Nio2CompletionHandler$$Lambda$349/1690110070.run(Unknown > Source) > java.security.AccessController.doPrivileged(Native Method) > > org.apache.sshd.common.io.nio2.Nio2CompletionHandler.completed(Nio2CompletionHandler.java:37) > sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126) > sun.nio.ch.Invoker$2.run(Invoker.java:218) > > sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > java.lang.Thread.run(Thread.java:748) > "SSH git-upload-pack /project2 (myuser)" prio=1 BLOCKED > > org.apache.sshd.common.session.helpers.AbstractSession.writePacket(AbstractSession.java:1107) > > org.apache.sshd.common.channel.AbstractChannel.writePacket(AbstractChannel.java:798) > > org.apache.sshd.common.channel.ChannelOutputStream.flush(ChannelOutputStream.java:227) > > org.apache.sshd.common.channel.ChannelOutputStream.write(ChannelOutputStream.java:127) > > org.eclipse.jgit.transport.UploadPack$ResponseBufferedOutputStream.write(UploadPack.java:2183) > > org.eclipse.jgit.transport.SideBandOutputStream.writeBuffer(SideBandOutputStream.java:174) > > org.eclipse.jgit.transport.SideBandOutputStream.write(SideBandOutputStream.java:153) > > org.eclipse.jgit.internal.storage.pack.PackOutputStream.write(PackOutputStream.java:132) > > org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs2(PackFile.java:614) > > org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs(PackFile.java:433) > > org.eclipse.jgit.internal.storage.file.WindowCursor.copyObjectAsIs(WindowCursor.java:221) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjectImpl(PackWriter.java:1644) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObject(PackWriter.java:1621) > > org.eclipse.jgit.internal.storage.pack.PackOutputStream.writeObject(PackOutputStream.java:171) > > org.eclipse.jgit.internal.storage.file.WindowCursor.writeObjects(WindowCursor.java:229) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1609) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1597) > > org.eclipse.jgit.internal.storage.pack.PackWriter.writePack(PackWriter.java:1154) > org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:2133) > org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:1947) > org.eclipse.jgit.transport.UploadPack.service(UploadPack.java:971) > org.eclipse.jgit.transport.UploadPack.upload(UploadPack.java:776) > com.google.gerrit.sshd.commands.Upload.runImpl(Upload.java:77) > > com.google.gerrit.sshd.AbstractGitCommand.service(AbstractGitCommand.java:98) > > com.google.gerrit.sshd.AbstractGitCommand.access$000(AbstractGitCommand.java:31) > > com.google.gerrit.sshd.AbstractGitCommand$1.run(AbstractGitCommand.java:63) > com.google.gerrit.sshd.BaseCommand$TaskThunk.run(BaseCommand.java:467) > > com.google.gerrit.server.logging.LoggingContextAwareRunnable.run(LoggingContextAwareRunnable.java:83) > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > com.google.gerrit.server.git.WorkQueue$Task.run(WorkQueue.java:646) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > java.lang.Thread.run(Thread.java:748) > {noformat} > In AbstractSession.handleNewKeys(), there is a lock on {{pendingEvents}}, > while trying to send these packets; when sending the packets fails, an > exception is generated, which causes the channel to get closed by calling > ChannelOutputStream.close(), which is a synchronized method. > At the same time, some other threads are trying to write data to the session: > it calls ChannelOutputStream.write(), which is also a synchronized method, > which calls AbstractSession.writePacket() which attempts to lock > {{pendingPackets}} to queue the packets. > *NOTE*: in our setup, we can reproduce this quite easily by increasing the > parallelism in git-repo, reducing the values of waitTimeout > (WAIT_FOR_SPACE_TIMEOUT), rekeyBytesLimit (REKEY_TIME_LIMIT) and > rekeyTimeLimit (REKEY_TIME_LIMIT) configuration options. -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@mina.apache.org For additional commands, e-mail: dev-h...@mina.apache.org