Repository: mina-sshd Updated Branches: refs/heads/master 063038150 -> 8136bf615
[SSHD-779] Attach pending packets write-completion listeners to their respective write-future(s) outside the encode locking block Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/8136bf61 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/8136bf61 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/8136bf61 Branch: refs/heads/master Commit: 8136bf615f63213af50f47584b08e618202008d5 Parents: 0630381 Author: Goldstein Lyor <[email protected]> Authored: Sun Dec 17 07:59:04 2017 +0200 Committer: Goldstein Lyor <[email protected]> Committed: Wed Dec 20 11:06:47 2017 +0200 ---------------------------------------------------------------------- .../helpers/AbstractConnectionService.java | 15 ++--- .../common/session/helpers/AbstractSession.java | 58 +++++++++++++++----- 2 files changed, 52 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8136bf61/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java index bcde65b..8f79530 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java @@ -34,6 +34,7 @@ import java.util.function.IntUnaryOperator; import org.apache.sshd.agent.common.AgentForwardSupport; import org.apache.sshd.agent.common.DefaultAgentForwardSupport; import org.apache.sshd.client.channel.AbstractClientChannel; +import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.Closeable; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.NamedFactory; @@ -622,7 +623,8 @@ public abstract class AbstractConnectionService<S extends AbstractSession> } int channelId = registerChannel(channel); - channel.open(sender, rwsize, rmpsize, buffer).addListener(future -> { + OpenFuture openFuture = channel.open(sender, rwsize, rmpsize, buffer); + openFuture.addListener(future -> { try { if (future.isOpened()) { Window window = channel.getLocalWindow(); @@ -683,7 +685,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> * @param buffer The request {@link Buffer} * @throws Exception If failed to process the request */ - protected void globalRequest(Buffer buffer) throws Exception { + protected IoWriteFuture globalRequest(Buffer buffer) throws Exception { String req = buffer.getString(); boolean wantReply = buffer.getBoolean(); if (log.isDebugEnabled()) { @@ -715,18 +717,17 @@ public abstract class AbstractConnectionService<S extends AbstractSession> this, handler.getClass().getSimpleName(), req, wantReply, result); } } else { - sendGlobalResponse(buffer, req, result, wantReply); - return; + return sendGlobalResponse(buffer, req, result, wantReply); } } } - handleUnknownRequest(buffer, req, wantReply); + return handleUnknownRequest(buffer, req, wantReply); } - protected void handleUnknownRequest(Buffer buffer, String req, boolean wantReply) throws IOException { + protected IoWriteFuture handleUnknownRequest(Buffer buffer, String req, boolean wantReply) throws IOException { log.warn("handleUnknownRequest({}) unknown global request: {}", this, req); - sendGlobalResponse(buffer, req, RequestHandler.Result.Unsupported, wantReply); + return sendGlobalResponse(buffer, req, RequestHandler.Result.Unsupported, wantReply); } protected IoWriteFuture sendGlobalResponse(Buffer buffer, String req, RequestHandler.Result result, boolean wantReply) throws IOException { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8136bf61/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java index 940b830..474eda6 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java @@ -66,6 +66,7 @@ import org.apache.sshd.common.forward.PortForwardingEventListener; import org.apache.sshd.common.future.DefaultKeyExchangeFuture; import org.apache.sshd.common.future.DefaultSshFuture; import org.apache.sshd.common.future.KeyExchangeFuture; +import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.io.IoSession; import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.kex.AbstractKexFactoryManager; @@ -844,20 +845,25 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen } signalSessionEvent(SessionListener.Event.KeyEstablished); + Collection<? extends Map.Entry<? extends SshFutureListener<IoWriteFuture>, IoWriteFuture>> pendingWrites; synchronized (pendingPackets) { - if (!pendingPackets.isEmpty()) { - if (log.isDebugEnabled()) { - log.debug("handleNewKeys({}) Dequeing {} pending packets", this, pendingPackets.size()); - } - synchronized (encodeLock) { - for (PendingWriteFuture future = pendingPackets.poll(); - future != null; - future = pendingPackets.poll()) { - doWritePacket(future.getBuffer()).addListener(future); - } + pendingWrites = sendPendingPackets(pendingPackets); + kexState.set(KexState.DONE); + } + + int pendingCount = pendingWrites.size(); + if (pendingCount > 0) { + if (log.isDebugEnabled()) { + log.debug("handleNewKeys({}) sent {} pending packets", this, pendingCount); + } + + for (Map.Entry<? extends SshFutureListener<IoWriteFuture>, IoWriteFuture> pe : pendingWrites) { + SshFutureListener<IoWriteFuture> listener = pe.getKey(); + IoWriteFuture future = pe.getValue(); + if (listener != null) { + future.addListener(listener); } } - kexState.set(KexState.DONE); } synchronized (lock) { @@ -865,6 +871,25 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen } } + protected List<Pair<PendingWriteFuture, IoWriteFuture>> sendPendingPackets(Queue<PendingWriteFuture> packetsQueue) throws IOException { + if (GenericUtils.isEmpty(packetsQueue)) { + return Collections.emptyList(); + } + + int numPending = packetsQueue.size(); + List<Pair<PendingWriteFuture, IoWriteFuture>> pendingWrites = new ArrayList<>(numPending); + synchronized (encodeLock) { + for (PendingWriteFuture future = pendingPackets.poll(); + future != null; + future = pendingPackets.poll()) { + IoWriteFuture writeFuture = doWritePacket(future.getBuffer()); + pendingWrites.add(new Pair<>(future, writeFuture)); + } + } + + return pendingWrites; + } + protected void validateKexState(int cmd, KexState expected) { KexState actual = kexState.get(); if (!expected.equals(actual)) { @@ -1034,11 +1059,15 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen @Override public <T extends Service> T getService(Class<T> clazz) { - for (Service s : getServices()) { + Collection<? extends Service> registeredServices = getServices(); + ValidateUtils.checkState(GenericUtils.isNotEmpty(registeredServices), "No registered services to look for %s", clazz.getSimpleName()); + + for (Service s : registeredServices) { if (clazz.isInstance(s)) { return clazz.cast(s); } } + throw new IllegalStateException("Attempted to access unknown service " + clazz.getSimpleName()); } @@ -1084,7 +1113,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen } future.setValue(t); }, timeout, unit); - future.addListener(future1 -> sched.cancel(false)); + future.addListener(f -> sched.cancel(false)); return writeFuture; } @@ -1176,13 +1205,14 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen } Object result; + boolean traceEnabled = log.isTraceEnabled(); synchronized (requestLock) { try { writePacket(buffer); synchronized (requestResult) { while (isOpen() && (maxWaitMillis > 0L) && (requestResult.get() == null)) { - if (log.isTraceEnabled()) { + if (traceEnabled) { log.trace("request({})[{}] remaining wait={}", this, request, maxWaitMillis); }
