[
https://issues.apache.org/jira/browse/SSHD-966?focusedWorklogId=761480&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-761480
]
ASF GitHub Bot logged work on SSHD-966:
---------------------------------------
Author: ASF GitHub Bot
Created on: 24/Apr/22 17:21
Start Date: 24/Apr/22 17:21
Worklog Time Spent: 10m
Work Description: lgoldstein commented on code in PR #217:
URL: https://github.com/apache/mina-sshd/pull/217#discussion_r857153738
##########
sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java:
##########
@@ -70,6 +70,10 @@ public class ClientUserAuthService extends AbstractCloseable
implements Service,
private UserAuth userAuth;
private int currentMethod;
+ private Object initLock = new Object();
Review Comment:
Recommend making *initLock* `final`
##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java:
##########
@@ -90,52 +93,174 @@ public byte getCommandType() {
return cmd;
}
- public void onWindowExpanded() throws IOException {
- doWriteIfPossible(true);
- }
-
+ /**
+ * {@inheritDoc}
+ *
+ * This write operation is <em>asynchronous</em>: if there is not enough
window space, it may keep the write pending
+ * or write only part of the buffer and keep the rest pending. Concurrent
writes are not allowed and will throw a
+ * {@link WritePendingException}. Any subsequent write <em>must</em> occur
only once the returned future is
+ * fulfilled; for instance triggered via a listener on the returned
future. Try to avoid doing a subsequent write
+ * directly in a future listener, though; doing so may lead to deep chains
of nested listener calls with deep stack
+ * traces, and may ultimately lead to a stack overflow.
+ *
+ * @throws WritePendingException if a concurrent write is attempted
+ */
@Override
- public synchronized IoWriteFuture writeBuffer(Buffer buffer) throws
IOException {
+ public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
if (isClosing()) {
- throw new EOFException("Closing: " + state);
+ throw new EOFException("Closing: " + writeState);
}
IoWriteFutureImpl future = new IoWriteFutureImpl(packetWriteId,
buffer);
- if (!pendingWrite.compareAndSet(null, future)) {
- throw new WritePendingException("A write operation is already
pending");
+ synchronized (writeState) {
+ if (writeState.isClosing()) { // Double check.
+ throw new EOFException("Closing: " + writeState);
+ }
+ if (writeState.writeInProgress) {
+ throw new WritePendingException("A write operation is already
pending");
+ }
+ writeState.lastWrite = future;
+ writeState.pendingWrite = future;
+ writeState.writeInProgress = true;
+ writeState.waitingOnIo = false;
}
doWriteIfPossible(false);
return future;
}
@Override
protected void preClose() {
- if (!(packetWriter instanceof Channel)) {
- try {
- packetWriter.close();
- } catch (IOException e) {
- error("preClose({}) Failed ({}) to pre-close packet writer:
{}",
- this, e.getClass().getSimpleName(), e.getMessage(), e);
+ synchronized (writeState) {
+ writeState.openState = state.get();
+ }
+ super.preClose();
+ }
+
+ @Override
+ protected void doCloseImmediately() {
+ try {
+ // Can't close this in preClose(); a graceful close waits for the
currently pending write to finish and thus
+ // still needs the packet writer.
+ if (!(packetWriter instanceof Channel)) {
+ try {
+ packetWriter.close();
+ } catch (IOException e) {
+ error("preClose({}) Failed ({}) to pre-close packet
writer: {}",
+ this, e.getClass().getSimpleName(),
e.getMessage(), e);
+ }
}
+ super.doCloseImmediately();
+ } finally {
+ shutdown();
}
+ }
- super.preClose();
+ private void shutdown() {
+ IoWriteFutureImpl current = null;
+ synchronized (writeState) {
+ writeState.openState = State.Closed;
+ current = writeState.pendingWrite;
+ writeState.pendingWrite = null;
+ writeState.waitingOnIo = false;
+ }
+ if (current != null) {
+ terminateFuture(current);
+ }
+ }
+
+ private void terminateFuture(IoWriteFutureImpl future) {
Review Comment:
Let's make this *protected* in the spirit of allowing users to overload
anything they want in our code.
##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java:
##########
@@ -282,4 +435,43 @@ public boolean
isSendChunkIfRemoteWindowIsSmallerThanPacketSize() {
public void setSendChunkIfRemoteWindowIsSmallerThanPacketSize(boolean
sendChunkIfRemoteWindowIsSmallerThanPacketSize) {
this.sendChunkIfRemoteWindowIsSmallerThanPacketSize =
sendChunkIfRemoteWindowIsSmallerThanPacketSize;
}
+
+ // Marker type to avoid repeated buffering
+ private static class BufferedFuture extends IoWriteFutureImpl {
+
+ BufferedFuture(Object id, Buffer buffer) {
+ super(id, buffer);
+ }
+ }
+
+ // Collects state variables; access is always synchronized on the single
instance per stream.
+ private static class WriteState {
Review Comment:
Let's make this *protected* (as well as its constructor, fields and methods)
in the spirit of allowing users to overload anything they want in our code.
##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java:
##########
@@ -171,43 +227,52 @@ public synchronized void write(byte[] buf, int s, int l)
throws IOException {
"Interrupted while waiting for remote space on
write len=" + l + " to " + this)
.initCause(e);
}
- }
- session.resetIdleTimeout();
- continue;
+ session.resetIdleTimeout();
+ break;
+ default:
+ // BUFFERED implies l == 0; outer loop will terminate
+ break;
Review Comment:
No need for *break* in *default* clause.
##########
sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.session.helpers;
+
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.security.GeneralSecurityException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.kex.KexState;
+import org.apache.sshd.common.util.ExceptionUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.slf4j.Logger;
+
+/**
+ * Manages SSH message sending during a key exchange. RFC 4253 specifies that
during a key exchange, no high-level
+ * messages are to be sent, but a receiver must be able to deal with messages
"in flight" until the peer's
+ * {@link SshConstants#SSH_MSG_KEX_INIT} message is received.
+ * <p>
+ * Apache MINA sshd queues up high-level messages that threads try to send
while a key exchange is ongoing, and sends
+ * them once the key exchange is done. Sending queued messages may make the
peer re-trigger a new key exchange, in which
+ * case sending queued messages stops and is resumed at the end of the new key
exchange.
+ * </p>
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4253#section-7">RFC 4253</a>
+ */
+public class KeyExchangeMessageHandler {
+
+ // With asynchronous flushing we get a classic producer-consumer problem.
The flushing thread is the single
+ // consumer, and there is a risk that it might get overrun by the
producers. The classical solution of using a
+ // LinkedBlockingQueue with a fixed maximum capacity doesn't work: we
cannot make the producers block when the queue
+ // is full; we might deadlock or be unable to handle any incoming message.
+ //
+ // We need an unbounded queue that never blocks the producers, but that
manages to throttle them such that the
+ // flushing thread can actually finish, and we still can handle incoming
messages (in particular also the peer's
+ // SSH_MSG_NEW_KEYS, since we start flushing already after having sent our
own SSH_MSG_NEW_KEYS).
+ //
+ // This is achieved by giving the flushing thread priority over the
threads that might enqueue additional packets
+ // and flushing at least two packets at a time. Additionally the flush
loop releases and shortly afterwards
+ // re-acquires the write lock, so normally not many readers (i.e.,
writePacket() calls) will get a chance to enqueue
+ // new packets.
+
+ /**
+ * We need the flushing thread to have priority over writing threads. So
we use a lock that favors writers over
+ * readers, and any state updates and the flushing thread are writers,
while writePacket() is a reader.
+ */
+ private final ReentrantReadWriteLock lock = new
ReentrantReadWriteLock(false);
Review Comment:
Let's make the members *protected* in the spirit of letting our users
override whatever they like - e,g, provide their own derived handler.
##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java:
##########
@@ -42,6 +43,19 @@
* @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a>
*/
public class ChannelOutputStream extends OutputStream implements
java.nio.channels.Channel, ChannelHolder {
+
+ private enum WriteState {
Review Comment:
Let's make this *protected* (or even *public*) in the spirit of allowing
users to overload anything they want in our code.
##########
sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java:
##########
@@ -124,7 +128,15 @@ public Map<String, Object> getProperties() {
@Override
public void start() {
- // ignored
+ Runnable initial = null;
Review Comment:
No need to initialize *initial* to *null* since its value is set
unconditionally inside the *synchronized* block.
##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java:
##########
@@ -90,52 +93,174 @@ public byte getCommandType() {
return cmd;
}
- public void onWindowExpanded() throws IOException {
- doWriteIfPossible(true);
- }
-
+ /**
+ * {@inheritDoc}
+ *
+ * This write operation is <em>asynchronous</em>: if there is not enough
window space, it may keep the write pending
+ * or write only part of the buffer and keep the rest pending. Concurrent
writes are not allowed and will throw a
+ * {@link WritePendingException}. Any subsequent write <em>must</em> occur
only once the returned future is
+ * fulfilled; for instance triggered via a listener on the returned
future. Try to avoid doing a subsequent write
+ * directly in a future listener, though; doing so may lead to deep chains
of nested listener calls with deep stack
+ * traces, and may ultimately lead to a stack overflow.
+ *
+ * @throws WritePendingException if a concurrent write is attempted
+ */
@Override
- public synchronized IoWriteFuture writeBuffer(Buffer buffer) throws
IOException {
+ public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
if (isClosing()) {
- throw new EOFException("Closing: " + state);
+ throw new EOFException("Closing: " + writeState);
}
IoWriteFutureImpl future = new IoWriteFutureImpl(packetWriteId,
buffer);
- if (!pendingWrite.compareAndSet(null, future)) {
- throw new WritePendingException("A write operation is already
pending");
+ synchronized (writeState) {
+ if (writeState.isClosing()) { // Double check.
+ throw new EOFException("Closing: " + writeState);
+ }
+ if (writeState.writeInProgress) {
+ throw new WritePendingException("A write operation is already
pending");
+ }
+ writeState.lastWrite = future;
+ writeState.pendingWrite = future;
+ writeState.writeInProgress = true;
+ writeState.waitingOnIo = false;
}
doWriteIfPossible(false);
return future;
}
@Override
protected void preClose() {
- if (!(packetWriter instanceof Channel)) {
- try {
- packetWriter.close();
- } catch (IOException e) {
- error("preClose({}) Failed ({}) to pre-close packet writer:
{}",
- this, e.getClass().getSimpleName(), e.getMessage(), e);
+ synchronized (writeState) {
+ writeState.openState = state.get();
+ }
+ super.preClose();
+ }
+
+ @Override
+ protected void doCloseImmediately() {
+ try {
+ // Can't close this in preClose(); a graceful close waits for the
currently pending write to finish and thus
+ // still needs the packet writer.
+ if (!(packetWriter instanceof Channel)) {
+ try {
+ packetWriter.close();
+ } catch (IOException e) {
+ error("preClose({}) Failed ({}) to pre-close packet
writer: {}",
+ this, e.getClass().getSimpleName(),
e.getMessage(), e);
+ }
}
+ super.doCloseImmediately();
+ } finally {
+ shutdown();
}
+ }
- super.preClose();
+ private void shutdown() {
Review Comment:
Let's make this *protected* in the spirit of allowing users to overload
anything they want in our code.
##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java:
##########
@@ -50,12 +64,15 @@ public class ChannelOutputStream extends OutputStream
implements java.nio.channe
private final Duration maxWaitTimeout;
private final byte cmd;
private final boolean eofOnClose;
- private final byte[] b = new byte[1];
- private final AtomicBoolean closedState = new AtomicBoolean(false);
+ private final AtomicReference<OpenState> openState = new
AtomicReference<>(OpenState.OPEN);
+
+ private final Object bufferLock = new Object();
private Buffer buffer;
private int bufferLength;
private int lastSize;
- private boolean noDelay;
+ private boolean isFlushing;
+
+ private volatile boolean noDelay;
Review Comment:
Personally I am not a big fan of *volatile* as means of multi-threading
safety - I prefer *AtomicBoolean*-s - but I defer to your judgement here if you
prefer *volatile*
##########
sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java:
##########
@@ -712,6 +762,17 @@ protected boolean handleServiceRequest(String serviceName,
Buffer buffer) throws
return true;
}
+ private boolean validateServiceKexState(KexState state) {
Review Comment:
Let's make this *protected* in the spirit of letting our users override
whatever they like.
##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java:
##########
@@ -42,6 +43,19 @@
* @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a>
*/
public class ChannelOutputStream extends OutputStream implements
java.nio.channels.Channel, ChannelHolder {
+
+ private enum WriteState {
+ BUFFERED,
+ NEED_FLUSH,
+ NEED_SPACE
+ }
+
+ private enum OpenState {
Review Comment:
Let's make this *protected* (or even *public*) in the spirit of allowing
users to overload anything they want in our code.
##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java:
##########
@@ -282,4 +435,43 @@ public boolean
isSendChunkIfRemoteWindowIsSmallerThanPacketSize() {
public void setSendChunkIfRemoteWindowIsSmallerThanPacketSize(boolean
sendChunkIfRemoteWindowIsSmallerThanPacketSize) {
this.sendChunkIfRemoteWindowIsSmallerThanPacketSize =
sendChunkIfRemoteWindowIsSmallerThanPacketSize;
}
+
+ // Marker type to avoid repeated buffering
+ private static class BufferedFuture extends IoWriteFutureImpl {
Review Comment:
Let's make this *protected* (as well as its constructor and methods) in the
spirit of allowing users to overload anything they want in our code.
##########
sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.session.helpers;
+
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.security.GeneralSecurityException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.kex.KexState;
+import org.apache.sshd.common.util.ExceptionUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.slf4j.Logger;
+
+/**
+ * Manages SSH message sending during a key exchange. RFC 4253 specifies that
during a key exchange, no high-level
+ * messages are to be sent, but a receiver must be able to deal with messages
"in flight" until the peer's
+ * {@link SshConstants#SSH_MSG_KEX_INIT} message is received.
+ * <p>
+ * Apache MINA sshd queues up high-level messages that threads try to send
while a key exchange is ongoing, and sends
+ * them once the key exchange is done. Sending queued messages may make the
peer re-trigger a new key exchange, in which
+ * case sending queued messages stops and is resumed at the end of the new key
exchange.
+ * </p>
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4253#section-7">RFC 4253</a>
+ */
+public class KeyExchangeMessageHandler {
+
+ // With asynchronous flushing we get a classic producer-consumer problem.
The flushing thread is the single
+ // consumer, and there is a risk that it might get overrun by the
producers. The classical solution of using a
+ // LinkedBlockingQueue with a fixed maximum capacity doesn't work: we
cannot make the producers block when the queue
+ // is full; we might deadlock or be unable to handle any incoming message.
+ //
+ // We need an unbounded queue that never blocks the producers, but that
manages to throttle them such that the
+ // flushing thread can actually finish, and we still can handle incoming
messages (in particular also the peer's
+ // SSH_MSG_NEW_KEYS, since we start flushing already after having sent our
own SSH_MSG_NEW_KEYS).
+ //
+ // This is achieved by giving the flushing thread priority over the
threads that might enqueue additional packets
+ // and flushing at least two packets at a time. Additionally the flush
loop releases and shortly afterwards
+ // re-acquires the write lock, so normally not many readers (i.e.,
writePacket() calls) will get a chance to enqueue
+ // new packets.
+
+ /**
+ * We need the flushing thread to have priority over writing threads. So
we use a lock that favors writers over
+ * readers, and any state updates and the flushing thread are writers,
while writePacket() is a reader.
+ */
+ private final ReentrantReadWriteLock lock = new
ReentrantReadWriteLock(false);
+
+ private final ExecutorService flushRunner =
Executors.newSingleThreadExecutor();
+
+ private final AbstractSession session;
+
+ private final Logger log;
+
+ /**
+ * Queues up high-level packets written during an ongoing key exchange.
+ */
+ private final Queue<PendingWriteFuture> pendingPackets = new
LinkedList<>();
+
+ /**
+ * Indicates that all pending packets have been flushed.
+ */
+ private boolean kexFlushed = true;
+
+ /**
+ * Never {@code null}. Used to block some threads when writing packets
while pending packets are still being flushed
+ * at the end of a KEX to avoid overrunning the flushing thread. Always
set, initially fulfilled. At the beginning
+ * of a KEX a new future is installed, which is fulfilled at the end of
the KEX once there are no more pending
+ * packets to be flushed.
+ */
+ private DefaultKeyExchangeFuture kexFlushedFuture;
+
+ /**
+ * Creates a new {@link KeyExchangeMessageHandler} for the given {@code
session}, using the given {@code Logger}.
+ *
+ * @param session {@link AbstractSession} the new instance belongs to
+ * @param log {@link Logger} to use for writing log messages
+ */
+ public KeyExchangeMessageHandler(AbstractSession session, Logger log) {
+ this.session = Objects.requireNonNull(session);
+ this.log = Objects.requireNonNull(log);
+ // Start with a fulfilled kexFlushed future.
+ kexFlushedFuture = new DefaultKeyExchangeFuture(session.toString(),
session.getFutureLock());
+ kexFlushedFuture.setValue(Boolean.TRUE);
+ }
+
+ public void updateState(Runnable update) {
+ lock.writeLock().lock();
+ try {
+ update.run();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public <V> V updateState(Supplier<V> update) {
+ lock.writeLock().lock();
+ try {
+ return update.get();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Initializes the state for a new key exchange. {@link
#allPacketsFlushed()} will be {@code false}, and a new
+ * future to be fulfilled when all queued packets will be flushed once the
key exchange is done is set. The
+ * currently set future from an earlier key exchange is returned. The
returned future may or may not be fulfilled;
+ * if it isn't, there are still left-over pending packets to write from
the previous key exchange, which will be
+ * written once the new key exchange flushes pending packets.
+ *
+ * @return the previous {@link DefaultKeyExchangeFuture} indicating
whether all pending packets were flushed.
+ */
+ public DefaultKeyExchangeFuture initNewKeyExchange() {
+ return updateState(() -> {
+ kexFlushed = false;
+ DefaultKeyExchangeFuture oldFuture = kexFlushedFuture;
+ kexFlushedFuture = new
DefaultKeyExchangeFuture(session.toString(), session.getFutureLock());
+ return oldFuture;
+ });
+ }
+
+ /**
+ * To be called when the key exchange is done. If there are any pending
packets, returns a future that will be
+ * fulfilled when {@link #flushQueue(DefaultKeyExchangeFuture)} with that
future as argument has flushed all pending
+ * packets, if there are any.
+ *
+ * @return the current {@link DefaultKeyExchangeFuture} and the number of
currently pending packets
+ */
+ public SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture>
terminateKeyExchange() {
+ return updateState(() -> {
+ int numPending = pendingPackets.size();
+ if (numPending == 0) {
+ kexFlushed = true;
+ }
+ return new SimpleImmutableEntry<Integer,
DefaultKeyExchangeFuture>(Integer.valueOf(numPending), kexFlushedFuture);
+ });
+ }
+
+ /**
+ * Pretends all pending packets had been written. To be called when the
{@link AbstractSession} closes.
+ */
+ public void shutdown() {
+ SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture> items =
updateState(() -> {
+ kexFlushed = true;
+ return new SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture>(
+ Integer.valueOf(pendingPackets.size()),
+ kexFlushedFuture);
+ });
+ items.getValue().setValue(Boolean.valueOf(items.getKey().intValue() ==
0));
+ flushRunner.shutdownNow();
+ }
+
+ /**
+ * Writes a packet. If a key exchange is ongoing, only low-level messages
are written directly; all other messages
+ * are queued and will be written once {@link
#flushQueue(DefaultKeyExchangeFuture)} is called when the key exchange
+ * is done. Packets written while there are still pending packets to be
flushed will either be queued, too, or the
+ * calling thread will be blocked with the given timeout until all packets
have been flushed. Whether a write will
+ * be blocked is determined by {@link #isBlockAllowed(int)}.
+ * <p>
+ * If {@code timeout <= 0} or {@code unit == null}, a time-out of
"forever" is assumed. Note that a timeout applies
+ * only if the calling thread is blocked.
+ * </p>
+ *
+ * @param buffer packet to write
+ * @param timeout number of {@link TimeUnit}s to wait at most if the
calling thread is blocked
+ * @param unit {@link TimeUnit} of {@code timeout}
+ * @return an {@link IoWriteFuture} that will be fulfilled
once the packet has indeed been written.
+ * @throws IOException if an error occurs
+ */
+ public IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit
unit) throws IOException {
+ // While exchanging key, queue high level packets.
+ byte[] bufData = buffer.array();
+ int cmd = bufData[buffer.rpos()] & 0xFF;
+ boolean enqueued = false;
+ boolean isLowLevelMessage = cmd <= SshConstants.SSH_MSG_KEX_LAST &&
cmd != SshConstants.SSH_MSG_SERVICE_REQUEST
+ && cmd != SshConstants.SSH_MSG_SERVICE_ACCEPT;
+ try {
+ if (isLowLevelMessage) {
+ // Low-level messages can always be sent.
+ return session.doWritePacket(buffer);
+ }
+ IoWriteFuture future = writeOrEnqueue(cmd, buffer, timeout, unit);
+ enqueued = future instanceof PendingWriteFuture;
+ return future;
+ } finally {
+ session.resetIdleTimeout();
+ if (!enqueued) {
+ try {
+ session.checkRekey();
+ } catch (GeneralSecurityException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("writePacket({}) failed ({}) to check
re-key: {}", session, e.getClass().getSimpleName(),
+ e.getMessage(), e);
+ }
+ throw ValidateUtils.initializeExceptionCause(new
ProtocolException(
+ "Failed (" + e.getClass().getSimpleName() + ")" +
" to check re-key necessity: " + e.getMessage()),
+ e);
+ } catch (Exception e) {
+ ExceptionUtils.rethrowAsIoException(e);
+ }
+ }
+ }
+ }
+
+ private IoWriteFuture writeOrEnqueue(int cmd, Buffer buffer, long timeout,
TimeUnit unit) throws IOException {
Review Comment:
Let's make this *protected* in the spirit of letting our users override
whatever they like.
Issue Time Tracking
-------------------
Worklog Id: (was: 761480)
Time Spent: 3h 50m (was: 3h 40m)
> Deadlock on disconnection at the end of key-exchange
> ----------------------------------------------------
>
> Key: SSHD-966
> URL: https://issues.apache.org/jira/browse/SSHD-966
> Project: MINA SSHD
> Issue Type: Bug
> Affects Versions: 2.0.0
> Reporter: Francois Ferrand
> Assignee: Lyor Goldstein
> Priority: Major
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> We are using git-repo to download projects from Gerrit server, using SSH.
> Gerrit is in version 2.16.16. which uses SSHD 2.0.0 and Mina 2.0.17 with NIO2
> backend.
> One particularity of this setup is that git-repo creates a single control
> master channel, and then downloads *lots* of Git repositories (500
> repositories, some of them relatively large), with some degree of
> parallelism. This takes a long time, lots of data, and the multiplexed
> connections are handled by gerrit in multiple threads.
> In some cases, we experience a deadlock when an error happens at the end of
> the key exchange, while sending pending packets:
> {noformat}
> Warning, the following threads are deadlocked : SSH git-upload-pack /project1
> (myuser), sshd-SshServer[df5f657]-nio2-thread-3
> "SSH git-upload-pack /project1 (myuser)" prio=1 BLOCKED
>
> org.apache.sshd.common.session.helpers.AbstractSession.writePacket(AbstractSession.java:1107)
>
> org.apache.sshd.common.channel.AbstractChannel.writePacket(AbstractChannel.java:798)
>
> org.apache.sshd.common.channel.ChannelOutputStream.flush(ChannelOutputStream.java:227)
>
> org.apache.sshd.common.channel.ChannelOutputStream.write(ChannelOutputStream.java:127)
>
> org.eclipse.jgit.transport.UploadPack$ResponseBufferedOutputStream.write(UploadPack.java:2183)
>
> org.eclipse.jgit.transport.SideBandOutputStream.writeBuffer(SideBandOutputStream.java:174)
>
> org.eclipse.jgit.transport.SideBandOutputStream.write(SideBandOutputStream.java:153)
>
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.write(PackOutputStream.java:132)
>
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs2(PackFile.java:614)
>
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs(PackFile.java:433)
>
> org.eclipse.jgit.internal.storage.file.WindowCursor.copyObjectAsIs(WindowCursor.java:221)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjectImpl(PackWriter.java:1644)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObject(PackWriter.java:1621)
>
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.writeObject(PackOutputStream.java:171)
>
> org.eclipse.jgit.internal.storage.file.WindowCursor.writeObjects(WindowCursor.java:229)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1609)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1597)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writePack(PackWriter.java:1154)
> org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:2133)
> org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:1947)
> org.eclipse.jgit.transport.UploadPack.service(UploadPack.java:971)
> org.eclipse.jgit.transport.UploadPack.upload(UploadPack.java:776)
> com.google.gerrit.sshd.commands.Upload.runImpl(Upload.java:77)
>
> com.google.gerrit.sshd.AbstractGitCommand.service(AbstractGitCommand.java:98)
>
> com.google.gerrit.sshd.AbstractGitCommand.access$000(AbstractGitCommand.java:31)
>
> com.google.gerrit.sshd.AbstractGitCommand$1.run(AbstractGitCommand.java:63)
> com.google.gerrit.sshd.BaseCommand$TaskThunk.run(BaseCommand.java:467)
>
> com.google.gerrit.server.logging.LoggingContextAwareRunnable.run(LoggingContextAwareRunnable.java:83)
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> com.google.gerrit.server.git.WorkQueue$Task.run(WorkQueue.java:646)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
> "sshd-SshServer[df5f657]-nio2-thread-3" daemon prio=5 BLOCKED
>
> org.apache.sshd.common.channel.ChannelOutputStream.close(ChannelOutputStream.java:249)
> org.apache.sshd.common.util.io.IoUtils.closeQuietly(IoUtils.java:151)
>
> org.apache.sshd.server.channel.ChannelSession.doCloseImmediately(ChannelSession.java:205)
>
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>
> org.apache.sshd.common.channel.AbstractChannel.close(AbstractChannel.java:576)
>
> org.apache.sshd.common.util.closeable.ParallelCloseable.doClose(ParallelCloseable.java:65)
>
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>
> org.apache.sshd.common.util.closeable.AbstractInnerCloseable.doCloseImmediately(AbstractInnerCloseable.java:46)
>
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>
> org.apache.sshd.common.util.closeable.ParallelCloseable.doClose(ParallelCloseable.java:65)
>
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>
> org.apache.sshd.common.util.closeable.SequentialCloseable$1.operationComplete(SequentialCloseable.java:56)
>
> org.apache.sshd.common.util.closeable.SequentialCloseable$1.operationComplete(SequentialCloseable.java:45)
>
> org.apache.sshd.common.util.closeable.SequentialCloseable.doClose(SequentialCloseable.java:69)
>
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>
> org.apache.sshd.common.util.closeable.AbstractInnerCloseable.doCloseImmediately(AbstractInnerCloseable.java:46)
>
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>
> org.apache.sshd.common.session.helpers.AbstractSession.exceptionCaught(AbstractSession.java:973)
>
> org.apache.sshd.common.session.helpers.AbstractSessionIoHandler.exceptionCaught(AbstractSessionIoHandler.java:53)
>
> org.apache.sshd.common.io.nio2.Nio2Session.exceptionCaught(Nio2Session.java:186)
>
> org.apache.sshd.common.io.nio2.Nio2Session.handleWriteCycleFailure(Nio2Session.java:460)
>
> org.apache.sshd.common.io.nio2.Nio2Session$2.onFailed(Nio2Session.java:415)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.lambda$failed$1(Nio2CompletionHandler.java:46)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler$$Lambda$627/1874891543.run(Unknown
> Source)
> java.security.AccessController.doPrivileged(Native Method)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.failed(Nio2CompletionHandler.java:45)
> sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:128)
> sun.nio.ch.Invoker.invokeDirect(Invoker.java:157)
>
> sun.nio.ch.UnixAsynchronousSocketChannelImpl.implWrite(UnixAsynchronousSocketChannelImpl.java:736)
>
> sun.nio.ch.AsynchronousSocketChannelImpl.write(AsynchronousSocketChannelImpl.java:382)
>
> sun.nio.ch.AsynchronousSocketChannelImpl.write(AsynchronousSocketChannelImpl.java:399)
>
> org.apache.sshd.common.io.nio2.Nio2Session.doWriteCycle(Nio2Session.java:401)
>
> org.apache.sshd.common.io.nio2.Nio2Session.startWriting(Nio2Session.java:386)
>
> org.apache.sshd.common.io.nio2.Nio2Session.writePacket(Nio2Session.java:169)
>
> org.apache.sshd.common.session.helpers.AbstractSession.doWritePacket(AbstractSession.java:1181)
>
> org.apache.sshd.common.session.helpers.AbstractSession.sendPendingPackets(AbstractSession.java:910)
>
> org.apache.sshd.common.session.helpers.AbstractSession.handleNewKeys(AbstractSession.java:875)
>
> org.apache.sshd.common.session.helpers.AbstractSession.doHandleMessage(AbstractSession.java:606)
>
> org.apache.sshd.common.session.helpers.AbstractSession.handleMessage(AbstractSession.java:555)
>
> org.apache.sshd.common.session.helpers.AbstractSession.decode(AbstractSession.java:1527)
>
> org.apache.sshd.common.session.helpers.AbstractSession.messageReceived(AbstractSession.java:516)
>
> org.apache.sshd.common.session.helpers.AbstractSessionIoHandler.messageReceived(AbstractSessionIoHandler.java:63)
>
> org.apache.sshd.common.io.nio2.Nio2Session.handleReadCycleCompletion(Nio2Session.java:339)
>
> org.apache.sshd.common.io.nio2.Nio2Session$1.onCompleted(Nio2Session.java:318)
>
> org.apache.sshd.common.io.nio2.Nio2Session$1.onCompleted(Nio2Session.java:315)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.lambda$completed$0(Nio2CompletionHandler.java:38)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler$$Lambda$349/1690110070.run(Unknown
> Source)
> java.security.AccessController.doPrivileged(Native Method)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.completed(Nio2CompletionHandler.java:37)
> sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
> sun.nio.ch.Invoker$2.run(Invoker.java:218)
>
> sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
> "SSH git-upload-pack /project2 (myuser)" prio=1 BLOCKED
>
> org.apache.sshd.common.session.helpers.AbstractSession.writePacket(AbstractSession.java:1107)
>
> org.apache.sshd.common.channel.AbstractChannel.writePacket(AbstractChannel.java:798)
>
> org.apache.sshd.common.channel.ChannelOutputStream.flush(ChannelOutputStream.java:227)
>
> org.apache.sshd.common.channel.ChannelOutputStream.write(ChannelOutputStream.java:127)
>
> org.eclipse.jgit.transport.UploadPack$ResponseBufferedOutputStream.write(UploadPack.java:2183)
>
> org.eclipse.jgit.transport.SideBandOutputStream.writeBuffer(SideBandOutputStream.java:174)
>
> org.eclipse.jgit.transport.SideBandOutputStream.write(SideBandOutputStream.java:153)
>
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.write(PackOutputStream.java:132)
>
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs2(PackFile.java:614)
>
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs(PackFile.java:433)
>
> org.eclipse.jgit.internal.storage.file.WindowCursor.copyObjectAsIs(WindowCursor.java:221)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjectImpl(PackWriter.java:1644)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObject(PackWriter.java:1621)
>
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.writeObject(PackOutputStream.java:171)
>
> org.eclipse.jgit.internal.storage.file.WindowCursor.writeObjects(WindowCursor.java:229)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1609)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1597)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writePack(PackWriter.java:1154)
> org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:2133)
> org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:1947)
> org.eclipse.jgit.transport.UploadPack.service(UploadPack.java:971)
> org.eclipse.jgit.transport.UploadPack.upload(UploadPack.java:776)
> com.google.gerrit.sshd.commands.Upload.runImpl(Upload.java:77)
>
> com.google.gerrit.sshd.AbstractGitCommand.service(AbstractGitCommand.java:98)
>
> com.google.gerrit.sshd.AbstractGitCommand.access$000(AbstractGitCommand.java:31)
>
> com.google.gerrit.sshd.AbstractGitCommand$1.run(AbstractGitCommand.java:63)
> com.google.gerrit.sshd.BaseCommand$TaskThunk.run(BaseCommand.java:467)
>
> com.google.gerrit.server.logging.LoggingContextAwareRunnable.run(LoggingContextAwareRunnable.java:83)
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> com.google.gerrit.server.git.WorkQueue$Task.run(WorkQueue.java:646)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
> {noformat}
> In AbstractSession.handleNewKeys(), there is a lock on {{pendingEvents}},
> while trying to send these packets; when sending the packets fails, an
> exception is generated, which causes the channel to get closed by calling
> ChannelOutputStream.close(), which is a synchronized method.
> At the same time, some other threads are trying to write data to the session:
> it calls ChannelOutputStream.write(), which is also a synchronized method,
> which calls AbstractSession.writePacket() which attempts to lock
> {{pendingPackets}} to queue the packets.
> *NOTE*: in our setup, we can reproduce this quite easily by increasing the
> parallelism in git-repo, reducing the values of waitTimeout
> (WAIT_FOR_SPACE_TIMEOUT), rekeyBytesLimit (REKEY_TIME_LIMIT) and
> rekeyTimeLimit (REKEY_TIME_LIMIT) configuration options.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]