tomaswolf commented on code in PR #217:
URL: https://github.com/apache/mina-sshd/pull/217#discussion_r857179032


##########
sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.session.helpers;
+
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.security.GeneralSecurityException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.kex.KexState;
+import org.apache.sshd.common.util.ExceptionUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.slf4j.Logger;
+
+/**
+ * Manages SSH message sending during a key exchange. RFC 4253 specifies that 
during a key exchange, no high-level
+ * messages are to be sent, but a receiver must be able to deal with messages 
"in flight" until the peer's
+ * {@link SshConstants#SSH_MSG_KEX_INIT} message is received.
+ * <p>
+ * Apache MINA sshd queues up high-level messages that threads try to send 
while a key exchange is ongoing, and sends
+ * them once the key exchange is done. Sending queued messages may make the 
peer re-trigger a new key exchange, in which
+ * case sending queued messages stops and is resumed at the end of the new key 
exchange.
+ * </p>
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4253#section-7";>RFC 4253</a>
+ */
+public class KeyExchangeMessageHandler {
+
+    // With asynchronous flushing we get a classic producer-consumer problem. 
The flushing thread is the single
+    // consumer, and there is a risk that it might get overrun by the 
producers. The classical solution of using a
+    // LinkedBlockingQueue with a fixed maximum capacity doesn't work: we 
cannot make the producers block when the queue
+    // is full; we might deadlock or be unable to handle any incoming message.
+    //
+    // We need an unbounded queue that never blocks the producers, but that 
manages to throttle them such that the
+    // flushing thread can actually finish, and we still can handle incoming 
messages (in particular also the peer's
+    // SSH_MSG_NEW_KEYS, since we start flushing already after having sent our 
own SSH_MSG_NEW_KEYS).
+    //
+    // This is achieved by giving the flushing thread priority over the 
threads that might enqueue additional packets
+    // and flushing at least two packets at a time. Additionally the flush 
loop releases and shortly afterwards
+    // re-acquires the write lock, so normally not many readers (i.e., 
writePacket() calls) will get a chance to enqueue
+    // new packets.
+
+    /**
+     * We need the flushing thread to have priority over writing threads. So 
we use a lock that favors writers over
+     * readers, and any state updates and the flushing thread are writers, 
while writePacket() is a reader.
+     */
+    private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(false);

Review Comment:
   Done.



##########
sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.session.helpers;
+
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.security.GeneralSecurityException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.kex.KexState;
+import org.apache.sshd.common.util.ExceptionUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.slf4j.Logger;
+
+/**
+ * Manages SSH message sending during a key exchange. RFC 4253 specifies that 
during a key exchange, no high-level
+ * messages are to be sent, but a receiver must be able to deal with messages 
"in flight" until the peer's
+ * {@link SshConstants#SSH_MSG_KEX_INIT} message is received.
+ * <p>
+ * Apache MINA sshd queues up high-level messages that threads try to send 
while a key exchange is ongoing, and sends
+ * them once the key exchange is done. Sending queued messages may make the 
peer re-trigger a new key exchange, in which
+ * case sending queued messages stops and is resumed at the end of the new key 
exchange.
+ * </p>
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4253#section-7";>RFC 4253</a>
+ */
+public class KeyExchangeMessageHandler {
+
+    // With asynchronous flushing we get a classic producer-consumer problem. 
The flushing thread is the single
+    // consumer, and there is a risk that it might get overrun by the 
producers. The classical solution of using a
+    // LinkedBlockingQueue with a fixed maximum capacity doesn't work: we 
cannot make the producers block when the queue
+    // is full; we might deadlock or be unable to handle any incoming message.
+    //
+    // We need an unbounded queue that never blocks the producers, but that 
manages to throttle them such that the
+    // flushing thread can actually finish, and we still can handle incoming 
messages (in particular also the peer's
+    // SSH_MSG_NEW_KEYS, since we start flushing already after having sent our 
own SSH_MSG_NEW_KEYS).
+    //
+    // This is achieved by giving the flushing thread priority over the 
threads that might enqueue additional packets
+    // and flushing at least two packets at a time. Additionally the flush 
loop releases and shortly afterwards
+    // re-acquires the write lock, so normally not many readers (i.e., 
writePacket() calls) will get a chance to enqueue
+    // new packets.
+
+    /**
+     * We need the flushing thread to have priority over writing threads. So 
we use a lock that favors writers over
+     * readers, and any state updates and the flushing thread are writers, 
while writePacket() is a reader.
+     */
+    private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(false);
+
+    private final ExecutorService flushRunner = 
Executors.newSingleThreadExecutor();
+
+    private final AbstractSession session;
+
+    private final Logger log;
+
+    /**
+     * Queues up high-level packets written during an ongoing key exchange.
+     */
+    private final Queue<PendingWriteFuture> pendingPackets = new 
LinkedList<>();
+
+    /**
+     * Indicates that all pending packets have been flushed.
+     */
+    private boolean kexFlushed = true;
+
+    /**
+     * Never {@code null}. Used to block some threads when writing packets 
while pending packets are still being flushed
+     * at the end of a KEX to avoid overrunning the flushing thread. Always 
set, initially fulfilled. At the beginning
+     * of a KEX a new future is installed, which is fulfilled at the end of 
the KEX once there are no more pending
+     * packets to be flushed.
+     */
+    private DefaultKeyExchangeFuture kexFlushedFuture;
+
+    /**
+     * Creates a new {@link KeyExchangeMessageHandler} for the given {@code 
session}, using the given {@code Logger}.
+     *
+     * @param session {@link AbstractSession} the new instance belongs to
+     * @param log     {@link Logger} to use for writing log messages
+     */
+    public KeyExchangeMessageHandler(AbstractSession session, Logger log) {
+        this.session = Objects.requireNonNull(session);
+        this.log = Objects.requireNonNull(log);
+        // Start with a fulfilled kexFlushed future.
+        kexFlushedFuture = new DefaultKeyExchangeFuture(session.toString(), 
session.getFutureLock());
+        kexFlushedFuture.setValue(Boolean.TRUE);
+    }
+
+    public void updateState(Runnable update) {
+        lock.writeLock().lock();
+        try {
+            update.run();
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public <V> V updateState(Supplier<V> update) {
+        lock.writeLock().lock();
+        try {
+            return update.get();
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Initializes the state for a new key exchange. {@link 
#allPacketsFlushed()} will be {@code false}, and a new
+     * future to be fulfilled when all queued packets will be flushed once the 
key exchange is done is set. The
+     * currently set future from an earlier key exchange is returned. The 
returned future may or may not be fulfilled;
+     * if it isn't, there are still left-over pending packets to write from 
the previous key exchange, which will be
+     * written once the new key exchange flushes pending packets.
+     *
+     * @return the previous {@link DefaultKeyExchangeFuture} indicating 
whether all pending packets were flushed.
+     */
+    public DefaultKeyExchangeFuture initNewKeyExchange() {
+        return updateState(() -> {
+            kexFlushed = false;
+            DefaultKeyExchangeFuture oldFuture = kexFlushedFuture;
+            kexFlushedFuture = new 
DefaultKeyExchangeFuture(session.toString(), session.getFutureLock());
+            return oldFuture;
+        });
+    }
+
+    /**
+     * To be called when the key exchange is done. If there are any pending 
packets, returns a future that will be
+     * fulfilled when {@link #flushQueue(DefaultKeyExchangeFuture)} with that 
future as argument has flushed all pending
+     * packets, if there are any.
+     *
+     * @return the current {@link DefaultKeyExchangeFuture} and the number of 
currently pending packets
+     */
+    public SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture> 
terminateKeyExchange() {
+        return updateState(() -> {
+            int numPending = pendingPackets.size();
+            if (numPending == 0) {
+                kexFlushed = true;
+            }
+            return new SimpleImmutableEntry<Integer, 
DefaultKeyExchangeFuture>(Integer.valueOf(numPending), kexFlushedFuture);
+        });
+    }
+
+    /**
+     * Pretends all pending packets had been written. To be called when the 
{@link AbstractSession} closes.
+     */
+    public void shutdown() {
+        SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture> items = 
updateState(() -> {
+            kexFlushed = true;
+            return new SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture>(
+                    Integer.valueOf(pendingPackets.size()),
+                    kexFlushedFuture);
+        });
+        items.getValue().setValue(Boolean.valueOf(items.getKey().intValue() == 
0));
+        flushRunner.shutdownNow();
+    }
+
+    /**
+     * Writes a packet. If a key exchange is ongoing, only low-level messages 
are written directly; all other messages
+     * are queued and will be written once {@link 
#flushQueue(DefaultKeyExchangeFuture)} is called when the key exchange
+     * is done. Packets written while there are still pending packets to be 
flushed will either be queued, too, or the
+     * calling thread will be blocked with the given timeout until all packets 
have been flushed. Whether a write will
+     * be blocked is determined by {@link #isBlockAllowed(int)}.
+     * <p>
+     * If {@code timeout <= 0} or {@code unit == null}, a time-out of 
"forever" is assumed. Note that a timeout applies
+     * only if the calling thread is blocked.
+     * </p>
+     *
+     * @param  buffer      packet to write
+     * @param  timeout     number of {@link TimeUnit}s to wait at most if the 
calling thread is blocked
+     * @param  unit        {@link TimeUnit} of {@code timeout}
+     * @return             an {@link IoWriteFuture} that will be fulfilled 
once the packet has indeed been written.
+     * @throws IOException if an error occurs
+     */
+    public IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit 
unit) throws IOException {
+        // While exchanging key, queue high level packets.
+        byte[] bufData = buffer.array();
+        int cmd = bufData[buffer.rpos()] & 0xFF;
+        boolean enqueued = false;
+        boolean isLowLevelMessage = cmd <= SshConstants.SSH_MSG_KEX_LAST && 
cmd != SshConstants.SSH_MSG_SERVICE_REQUEST
+                && cmd != SshConstants.SSH_MSG_SERVICE_ACCEPT;
+        try {
+            if (isLowLevelMessage) {
+                // Low-level messages can always be sent.
+                return session.doWritePacket(buffer);
+            }
+            IoWriteFuture future = writeOrEnqueue(cmd, buffer, timeout, unit);
+            enqueued = future instanceof PendingWriteFuture;
+            return future;
+        } finally {
+            session.resetIdleTimeout();
+            if (!enqueued) {
+                try {
+                    session.checkRekey();
+                } catch (GeneralSecurityException e) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("writePacket({}) failed ({}) to check 
re-key: {}", session, e.getClass().getSimpleName(),
+                                e.getMessage(), e);
+                    }
+                    throw ValidateUtils.initializeExceptionCause(new 
ProtocolException(
+                            "Failed (" + e.getClass().getSimpleName() + ")" + 
" to check re-key necessity: " + e.getMessage()),
+                            e);
+                } catch (Exception e) {
+                    ExceptionUtils.rethrowAsIoException(e);
+                }
+            }
+        }
+    }
+
+    private IoWriteFuture writeOrEnqueue(int cmd, Buffer buffer, long timeout, 
TimeUnit unit) throws IOException {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@mina.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@mina.apache.org
For additional commands, e-mail: dev-h...@mina.apache.org

Reply via email to