[SSHD-836] Make final some methods from base Closeable implementations
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/bba23bf7 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/bba23bf7 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/bba23bf7 Branch: refs/heads/master Commit: bba23bf70bcd7e4d5a972806a9df62669e7cda81 Parents: 7b35bb3 Author: Guillaume Nodet <gno...@apache.org> Authored: Wed Jul 25 14:38:05 2018 +0200 Committer: Guillaume Nodet <gno...@apache.org> Committed: Wed Jul 25 14:38:05 2018 +0200 ---------------------------------------------------------------------- .../agent/local/ChannelAgentForwarding.java | 9 ++- .../sshd/agent/unix/AgentForwardedChannel.java | 9 ++- .../sshd/agent/unix/ChannelAgentForwarding.java | 9 ++- .../client/channel/AbstractClientChannel.java | 2 +- .../sshd/client/channel/ChannelSession.java | 12 ++- .../sshd/common/channel/AbstractChannel.java | 43 ++++------ .../sshd/common/io/nio2/Nio2Acceptor.java | 16 ++-- .../util/closeable/AbstractCloseable.java | 6 +- .../util/closeable/AbstractInnerCloseable.java | 4 +- .../sshd/server/channel/ChannelSession.java | 8 +- .../sshd/server/forward/TcpipServerChannel.java | 84 ++++++++++---------- 11 files changed, 104 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java index e051f1e..0ca735b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java @@ -28,10 +28,10 @@ import org.apache.sshd.agent.SshAgentFactory; import org.apache.sshd.agent.common.AbstractAgentClient; import org.apache.sshd.client.future.DefaultOpenFuture; import org.apache.sshd.client.future.OpenFuture; +import org.apache.sshd.common.Closeable; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.ChannelOutputStream; -import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.ValidateUtils; @@ -110,8 +110,11 @@ public class ChannelAgentForwarding extends AbstractServerChannel { } @Override - public CloseFuture close(boolean immediately) { - return super.close(immediately).addListener(sshFuture -> closeImmediately0()); + protected Closeable getInnerCloseable() { + return builder() + .close(super.getInnerCloseable()) + .run(toString(), this::closeImmediately0) + .build(); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java index 1b02333..1c7b95b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.OutputStream; import org.apache.sshd.client.channel.AbstractClientChannel; +import org.apache.sshd.common.Closeable; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.ChannelOutputStream; import org.apache.sshd.common.channel.Window; @@ -68,9 +69,11 @@ public class AgentForwardedChannel extends AbstractClientChannel implements Runn } @Override - protected synchronized void doCloseImmediately() { - Socket.close(socket); - super.doCloseImmediately(); + protected Closeable getInnerCloseable() { + return builder() + .close(super.getInnerCloseable()) + .run(toString(), () -> Socket.close(socket)) + .build(); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java index 586faae..4a6a5ce 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java @@ -27,9 +27,9 @@ import java.util.concurrent.Future; import org.apache.sshd.agent.SshAgent; import org.apache.sshd.client.future.DefaultOpenFuture; import org.apache.sshd.client.future.OpenFuture; +import org.apache.sshd.common.Closeable; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.ChannelOutputStream; -import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; @@ -149,8 +149,11 @@ public class ChannelAgentForwarding extends AbstractServerChannel { } @Override - public CloseFuture close(boolean immediately) { - return super.close(immediately).addListener(sshFuture -> closeImmediately0()); + protected Closeable getInnerCloseable() { + return builder() + .close(super.getInnerCloseable()) + .run(toString(), this::closeImmediately0) + .build(); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java index 1ed4518..c61e271 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java @@ -204,7 +204,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C IoUtils.closeQuietly(invertedIn, invertedOut, invertedErr); }) .parallel(asyncIn, asyncOut, asyncErr) - .close(new GracefulChannelCloseable()) + .close(super.getInnerCloseable()) .build(); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java index b3f8c67..1b3ff6f 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.concurrent.Future; +import org.apache.sshd.common.Closeable; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.ChannelAsyncInputStream; import org.apache.sshd.common.channel.ChannelAsyncOutputStream; @@ -125,7 +126,14 @@ public class ChannelSession extends AbstractClientChannel { } @Override - protected void doCloseImmediately() { + protected Closeable getInnerCloseable() { + return builder() + .close(super.getInnerCloseable()) + .run(toString(), this::closeImmediately0) + .build(); + } + + protected void closeImmediately0() { if ((pumper != null) && (pumperService != null) && (!pumperService.isShutdown())) { try { if (!pumper.isDone()) { @@ -147,8 +155,6 @@ public class ChannelSession extends AbstractClientChannel { pumperService = null; } } - - super.doCloseImmediately(); } protected void pumpInputStream() { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java index 6d3c78e..b1f018e 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java @@ -537,7 +537,7 @@ public abstract class AbstractChannel log.debug("handleClose({}) SSH_MSG_CHANNEL_CLOSE", this); } - if (!eofSent.getAndSet(true)) { + if (!isEofSent()) { if (debugEnabled) { log.debug("handleClose({}) prevent sending EOF", this); } @@ -551,19 +551,15 @@ public abstract class AbstractChannel } @Override - public CloseFuture close(boolean immediately) { - if (!eofSent.getAndSet(true)) { - if (log.isDebugEnabled()) { - log.debug("close({}) prevent sending EOF", this); - } - } - - return super.close(immediately); - } - - @Override protected Closeable getInnerCloseable() { - return new GracefulChannelCloseable(); + return builder() + .sequential(new GracefulChannelCloseable(), getExecutorService()) + .run(toString(), () -> { + if (service != null) { + service.unregisterChannel(AbstractChannel.this); + } + }) + .build(); } public class GracefulChannelCloseable extends IoBaseCloseable { @@ -684,6 +680,10 @@ public abstract class AbstractChannel @Override protected void preClose() { + if (!isEofSent()) { + log.debug("close({}) prevent sending EOF", this); + } + try { signalChannelClosed(null); } finally { @@ -768,15 +768,6 @@ public abstract class AbstractChannel } @Override - protected void doCloseImmediately() { - if (service != null) { - service.unregisterChannel(AbstractChannel.this); - } - - super.doCloseImmediately(); - } - - @Override public IoWriteFuture writePacket(Buffer buffer) throws IOException { Session s = getSession(); if (!isClosing()) { @@ -920,16 +911,16 @@ public abstract class AbstractChannel protected abstract void doWriteExtendedData(byte[] data, int off, long len) throws IOException; protected void sendEof() throws IOException { - if (eofSent.getAndSet(true)) { + if (isClosing()) { if (log.isDebugEnabled()) { - log.debug("sendEof({}) already sent", this); + log.debug("sendEof({}) already closing or closed", this); } return; } - if (isClosing()) { + if (eofSent.getAndSet(true)) { if (log.isDebugEnabled()) { - log.debug("sendEof({}) already closing or closed", this); + log.debug("sendEof({}) already sent", this); } return; } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java index 281d540..73a3c5f 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java @@ -33,8 +33,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; +import org.apache.sshd.common.Closeable; import org.apache.sshd.common.FactoryManager; -import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.io.IoAcceptor; import org.apache.sshd.common.io.IoHandler; import org.apache.sshd.common.util.ValidateUtils; @@ -137,13 +137,20 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor { } @Override - public CloseFuture close(boolean immediately) { + protected void preClose() { unbind(); - return super.close(immediately); + super.preClose(); } @Override - public void doCloseImmediately() { + protected Closeable getInnerCloseable() { + return builder() + .close(super.getInnerCloseable()) + .run(toString(), this::closeImmediately0) + .build(); + } + + protected void closeImmediately0() { Collection<SocketAddress> boundAddresses = getBoundAddresses(); boolean debugEnabled = log.isDebugEnabled(); for (SocketAddress address : boundAddresses) { @@ -163,7 +170,6 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor { } } } - super.doCloseImmediately(); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java b/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java index dffbd5b..6413ebb 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java @@ -71,7 +71,7 @@ public abstract class AbstractCloseable extends IoBaseCloseable { } @Override - public CloseFuture close(boolean immediately) { + public final CloseFuture close(boolean immediately) { boolean debugEnabled = log.isDebugEnabled(); if (immediately) { if (state.compareAndSet(State.Opened, State.Immediate) @@ -123,12 +123,12 @@ public abstract class AbstractCloseable extends IoBaseCloseable { } @Override - public boolean isClosed() { + public final boolean isClosed() { return state.get() == State.Closed; } @Override - public boolean isClosing() { + public final boolean isClosing() { return state.get() != State.Opened; } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractInnerCloseable.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractInnerCloseable.java b/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractInnerCloseable.java index 853033f..6518d23 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractInnerCloseable.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractInnerCloseable.java @@ -36,13 +36,13 @@ public abstract class AbstractInnerCloseable extends AbstractCloseable { protected abstract Closeable getInnerCloseable(); @Override - protected CloseFuture doCloseGracefully() { + protected final CloseFuture doCloseGracefully() { return getInnerCloseable().close(false); } @Override @SuppressWarnings("synthetic-access") - protected void doCloseImmediately() { + protected final void doCloseImmediately() { getInnerCloseable().close(true).addListener(future -> AbstractInnerCloseable.super.doCloseImmediately()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java index 78f5861..16e16b7 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java @@ -123,8 +123,9 @@ public class ChannelSession extends AbstractServerChannel { @Override protected Closeable getInnerCloseable() { return builder() - .sequential(new CommandCloseable(), new GracefulChannelCloseable()) + .sequential(new CommandCloseable(), super.getInnerCloseable()) .parallel(asyncOut, asyncErr) + .run(toString(), this::closeImmediately0) .build(); } @@ -190,8 +191,7 @@ public class ChannelSession extends AbstractServerChannel { } } - @Override - protected void doCloseImmediately() { + protected void closeImmediately0() { boolean debugEnabled = log.isDebugEnabled(); if (commandInstance != null) { try { @@ -223,8 +223,6 @@ public class ChannelSession extends AbstractServerChannel { } } } - - super.doCloseImmediately(); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java index 663553b..e7eef13 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java @@ -27,6 +27,7 @@ import java.util.Objects; import org.apache.sshd.client.future.DefaultOpenFuture; import org.apache.sshd.client.future.OpenFuture; +import org.apache.sshd.common.Closeable; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.RuntimeSshException; import org.apache.sshd.common.SshConstants; @@ -48,6 +49,7 @@ import org.apache.sshd.common.util.Readable; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; +import org.apache.sshd.common.util.closeable.AbstractCloseable; import org.apache.sshd.common.util.net.SshdSocketAddress; import org.apache.sshd.common.util.threads.ExecutorService; import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; @@ -292,51 +294,45 @@ public class TcpipServerChannel extends AbstractServerChannel implements Forward } @Override - public CloseFuture close(boolean immediately) { - boolean debugEnabled = log.isDebugEnabled(); - /* - * In case of graceful shutdown (e.g. when the remote channel is gently closed) - * we also need to close the ChannelOutputStream which flushes remaining buffer - * and sends SSH_MSG_CHANNEL_EOF back to the client. - */ - if ((!immediately) && (out != null)) { - try { - if (debugEnabled) { - log.debug("Closing channel output stream of {}", this); - } - - out.close(); - } catch (IOException | RuntimeException ignored) { - if (debugEnabled) { - log.debug("{} while closing channel output stream of {}: {}", - ignored.getClass().getSimpleName(), this, ignored.getMessage()); - } - } - } - - CloseFuture closingFeature = super.close(immediately); - - // We also need to dispose of the connector, but unfortunately we - // are being invoked by the connector thread or the connector's - // own processor thread. Disposing of the connector within either - // causes deadlock. Instead create a thread to dispose of the - // connector in the background. - ExecutorService service = getExecutorService(); - - // allocate a temporary executor service if none provided - ExecutorService executors = (service == null) - ? ThreadUtils.newSingleThreadExecutor("TcpIpServerChannel-ConnectorCleanup[" + getSession() + "]") - : ThreadUtils.noClose(service); + protected Closeable getInnerCloseable() { + return builder() + .run(toString(), () -> { + /* + * In case of graceful shutdown (e.g. when the remote channel is gently closed) + * we also need to close the ChannelOutputStream which flushes remaining buffer + * and sends SSH_MSG_CHANNEL_EOF back to the client. + */ + if (out != null) { + try { + log.debug("Closing channel output stream of {}", this); + out.close(); + } catch (IOException | RuntimeException ignored) { + log.debug("{} while closing channel output stream of {}: {}", + ignored.getClass().getSimpleName(), this, ignored.getMessage(), ignored); + } + } + }) + .close(super.getInnerCloseable()) + .close(new AbstractCloseable() { + ExecutorService executor = ThreadUtils.newCachedThreadPool("TcpIpServerChannel-ConnectorCleanup[" + getSession() + "]"); + @Override + protected CloseFuture doCloseGracefully() { + executor.submit(() -> { + connector.close(false); + }); + return null; + } - return builder().when(closingFeature).run(toString(), () -> { - executors.submit(() -> { - if (debugEnabled) { - log.debug("disposing connector: {} for: {}", connector, TcpipServerChannel.this); - } - connector.close(immediately) - .addListener(f -> executors.close(true)); - }); - }).build().close(false); + @Override + protected void doCloseImmediately() { + executor.submit(() -> { + connector.close(true) + .addListener(f -> executor.close(true)); + }); + super.doCloseImmediately(); + } + }) + .build(); } @Override