Repository: mina-sshd Updated Branches: refs/heads/SSHD-812-async [created] e05aa13c7
[SSHD-812] Make SftpSubsystem a bit more asynchronous Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/e05aa13c Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/e05aa13c Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/e05aa13c Branch: refs/heads/SSHD-812-async Commit: e05aa13c781d135105700f5aba314c4788c1c613 Parents: 4651d23 Author: Guillaume Nodet <gno...@apache.org> Authored: Mon Apr 9 10:05:32 2018 +0200 Committer: Guillaume Nodet <gno...@apache.org> Committed: Mon Apr 9 10:05:32 2018 +0200 ---------------------------------------------------------------------- .../sshd/server/channel/ChannelSession.java | 4 +- .../sftp/AbstractSftpSubsystemHelper.java | 124 +++++++------ .../server/subsystem/sftp/SftpSubsystem.java | 186 +++++++++---------- 3 files changed, 167 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e05aa13c/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java index 806d6c7..bcc1e33 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java @@ -667,7 +667,9 @@ public class ChannelSession extends AbstractServerChannel { if (this.receiver == null) { // if the command hasn't installed any ChannelDataReceiver, install the default // and give the command an InputStream - if (command instanceof AsyncCommand) { + if (command instanceof ChannelDataReceiver) { + setDataReceiver((ChannelDataReceiver) command); + } else if (command instanceof AsyncCommand) { AsyncDataReceiver recv = new AsyncDataReceiver(this); setDataReceiver(recv); ((AsyncCommand) command).setIoInputStream(recv.getIn()); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e05aa13c/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java index 08213ee..027b8c2 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/AbstractSftpSubsystemHelper.java @@ -312,7 +312,7 @@ public abstract class AbstractSftpSubsystemHelper } if ((proposed < low) || (proposed > hig)) { - sendStatus(BufferUtils.clear(buffer), id, failureOpcode, "Proposed version (" + proposed + ") not in supported range: " + available); + sendStatus(buffer, id, failureOpcode, "Proposed version (" + proposed + ") not in supported range: " + available); return null; } @@ -379,11 +379,11 @@ public abstract class AbstractSftpSubsystemHelper try { handle = doOpen(id, path, pflags, access, attrs); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_OPEN, path); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_OPEN, path); return; } - sendHandle(BufferUtils.clear(buffer), id, handle); + sendHandle(buffer, id, handle); } /** @@ -402,11 +402,11 @@ public abstract class AbstractSftpSubsystemHelper try { doClose(id, handle); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_CLOSE, handle); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_CLOSE, handle); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "", ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, "", ""); } protected abstract void doClose(int id, String handle) throws IOException; @@ -428,6 +428,7 @@ public abstract class AbstractSftpSubsystemHelper buffer.clear(); buffer.ensureCapacity(readLen + Long.SIZE /* the header */, IntUnaryOperator.identity()); + buffer.putInt(0); buffer.putByte((byte) SftpConstants.SSH_FXP_DATA); buffer.putInt(id); @@ -442,7 +443,7 @@ public abstract class AbstractSftpSubsystemHelper buffer.wpos(startPos + len); BufferUtils.updateLengthPlaceholder(buffer, lenPos, len); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_READ, handle, offset, requestedLength); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_READ, handle, offset, requestedLength); return; } @@ -458,11 +459,11 @@ public abstract class AbstractSftpSubsystemHelper try { doWrite(id, handle, offset, length, buffer.array(), buffer.rpos(), buffer.available()); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_WRITE, handle, offset, length); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_WRITE, handle, offset, length); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected abstract void doWrite(int id, String handle, long offset, int length, byte[] data, int doff, int remaining) throws IOException; @@ -479,11 +480,11 @@ public abstract class AbstractSftpSubsystemHelper try { attrs = doLStat(id, path, flags); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_LSTAT, path, flags); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_LSTAT, path, flags); return; } - sendAttrs(BufferUtils.clear(buffer), id, attrs); + sendAttrs(buffer, id, attrs); } protected Map<String, Object> doLStat(int id, String path, int flags) throws IOException { @@ -506,11 +507,11 @@ public abstract class AbstractSftpSubsystemHelper try { doSetStat(id, path, attrs); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_SETSTAT, path); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_SETSTAT, path); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected void doSetStat(int id, String path, Map<String, ?> attrs) throws IOException { @@ -534,11 +535,11 @@ public abstract class AbstractSftpSubsystemHelper try { attrs = doFStat(id, handle, flags); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_FSTAT, handle, flags); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_FSTAT, handle, flags); return; } - sendAttrs(BufferUtils.clear(buffer), id, attrs); + sendAttrs(buffer, id, attrs); } protected abstract Map<String, Object> doFStat(int id, String handle, int flags) throws IOException; @@ -549,11 +550,11 @@ public abstract class AbstractSftpSubsystemHelper try { doFSetStat(id, handle, attrs); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_FSETSTAT, handle, attrs); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_FSETSTAT, handle, attrs); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected abstract void doFSetStat(int id, String handle, Map<String, ?> attrs) throws IOException; @@ -573,11 +574,11 @@ public abstract class AbstractSftpSubsystemHelper getPathResolutionLinkOption(SftpConstants.SSH_FXP_OPENDIR, "", p); handle = doOpenDir(id, path, p, options); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_OPENDIR, path); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_OPENDIR, path); return; } - sendHandle(BufferUtils.clear(buffer), id, handle); + sendHandle(buffer, id, handle); } protected abstract String doOpenDir(int id, String path, Path p, LinkOption... options) throws IOException; @@ -597,11 +598,11 @@ public abstract class AbstractSftpSubsystemHelper doLink(id, targetPath, linkPath, symLink); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_LINK, targetPath, linkPath, symLink); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_LINK, targetPath, linkPath, symLink); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected void doLink(int id, String targetPath, String linkPath, boolean symLink) throws IOException { @@ -618,11 +619,11 @@ public abstract class AbstractSftpSubsystemHelper } doSymLink(id, targetPath, linkPath); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_SYMLINK, targetPath, linkPath); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_SYMLINK, targetPath, linkPath); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected void doSymLink(int id, String targetPath, String linkPath) throws IOException { @@ -639,11 +640,11 @@ public abstract class AbstractSftpSubsystemHelper try { doOpenSSHHardLink(id, srcFile, dstFile); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_EXTENDED, HardLinkExtensionParser.NAME, srcFile, dstFile); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, HardLinkExtensionParser.NAME, srcFile, dstFile); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected void doOpenSSHHardLink(int id, String srcFile, String dstFile) throws IOException { @@ -661,11 +662,12 @@ public abstract class AbstractSftpSubsystemHelper try { info = doSpaceAvailable(id, path); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_SPACE_AVAILABLE, path); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_SPACE_AVAILABLE, path); return; } buffer.clear(); + buffer.putInt(0); buffer.putByte((byte) SftpConstants.SSH_FXP_EXTENDED_REPLY); buffer.putInt(id); SpaceAvailableExtensionInfo.encode(buffer, info); @@ -694,11 +696,11 @@ public abstract class AbstractSftpSubsystemHelper // TODO : implement text-seek - see https://tools.ietf.org/html/draft-ietf-secsh-filexfer-03#section-6.3 doTextSeek(id, handle, line); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_TEXT_SEEK, handle, line); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_TEXT_SEEK, handle, line); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected abstract void doTextSeek(int id, String handle, long line) throws IOException; @@ -709,11 +711,11 @@ public abstract class AbstractSftpSubsystemHelper try { doOpenSSHFsync(id, handle); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_EXTENDED, FsyncExtensionParser.NAME, handle); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, FsyncExtensionParser.NAME, handle); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected abstract void doOpenSSHFsync(int id, String handle) throws IOException; @@ -727,12 +729,13 @@ public abstract class AbstractSftpSubsystemHelper int blockSize = buffer.getInt(); try { buffer.clear(); + buffer.putInt(0); buffer.putByte((byte) SftpConstants.SSH_FXP_EXTENDED_REPLY); buffer.putInt(id); buffer.putString(SftpConstants.EXT_CHECK_FILE); doCheckFileHash(id, targetType, target, Arrays.asList(algos), startOffset, length, blockSize, buffer); } catch (Exception e) { - sendStatus(BufferUtils.clear(buffer), id, e, + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, targetType, target, algList, startOffset, length, blockSize); return; } @@ -844,12 +847,13 @@ public abstract class AbstractSftpSubsystemHelper } } catch (Exception e) { - sendStatus(BufferUtils.clear(buffer), id, e, + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, targetType, target, startOffset, length, quickCheckHash); return; } buffer.clear(); + buffer.putInt(0); buffer.putByte((byte) SftpConstants.SSH_FXP_EXTENDED_REPLY); buffer.putInt(id); buffer.putString(targetType); @@ -976,11 +980,11 @@ public abstract class AbstractSftpSubsystemHelper } l = doReadLink(id, path); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_READLINK, path); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_READLINK, path); return; } - sendLink(BufferUtils.clear(buffer), id, l); + sendLink(buffer, id, l); } protected String doReadLink(int id, String path) throws IOException { @@ -1004,11 +1008,11 @@ public abstract class AbstractSftpSubsystemHelper try { doRename(id, oldPath, newPath, flags); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_RENAME, oldPath, newPath, flags); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_RENAME, oldPath, newPath, flags); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected void doRename(int id, String oldPath, String newPath, int flags) throws IOException { @@ -1057,13 +1061,13 @@ public abstract class AbstractSftpSubsystemHelper try { doCopyData(id, readHandle, readOffset, readLength, writeHandle, writeOffset); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_COPY_DATA, readHandle, readOffset, readLength, writeHandle, writeOffset); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected abstract void doCopyData(int id, String readHandle, long readOffset, long readLength, String writeHandle, long writeOffset) throws IOException; @@ -1077,12 +1081,12 @@ public abstract class AbstractSftpSubsystemHelper try { doCopyFile(id, srcFile, dstFile, overwriteDestination); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_EXTENDED, SftpConstants.EXT_COPY_FILE, srcFile, dstFile, overwriteDestination); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected void doCopyFile(int id, String srcFile, String dstFile, boolean overwriteDestination) throws IOException { @@ -1113,11 +1117,11 @@ public abstract class AbstractSftpSubsystemHelper try { doBlock(id, handle, offset, length, mask); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_BLOCK, handle, offset, length, mask); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_BLOCK, handle, offset, length, mask); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected abstract void doBlock(int id, String handle, long offset, long length, int mask) throws IOException; @@ -1129,11 +1133,11 @@ public abstract class AbstractSftpSubsystemHelper try { doUnblock(id, handle, offset, length); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_UNBLOCK, handle, offset, length); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_UNBLOCK, handle, offset, length); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected abstract void doUnblock(int id, String handle, long offset, long length) throws IOException; @@ -1150,11 +1154,11 @@ public abstract class AbstractSftpSubsystemHelper try { attrs = doStat(id, path, flags); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_STAT, path, flags); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_STAT, path, flags); return; } - sendAttrs(BufferUtils.clear(buffer), id, attrs); + sendAttrs(buffer, id, attrs); } protected Map<String, Object> doStat(int id, String path, int flags) throws IOException { @@ -1267,11 +1271,11 @@ public abstract class AbstractSftpSubsystemHelper } } } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_REALPATH, path); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_REALPATH, path); return; } - sendPath(BufferUtils.clear(buffer), id, result.getKey(), attrs); + sendPath(buffer, id, result.getKey(), attrs); } protected SimpleImmutableEntry<Path, Boolean> doRealPathV6( @@ -1322,11 +1326,11 @@ public abstract class AbstractSftpSubsystemHelper try { doRemoveDirectory(id, path, IoUtils.getLinkOptions(false)); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_RMDIR, path); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_RMDIR, path); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected void doRemoveDirectory(int id, String path, LinkOption... options) throws IOException { @@ -1368,11 +1372,11 @@ public abstract class AbstractSftpSubsystemHelper try { doMakeDirectory(id, path, attrs, IoUtils.getLinkOptions(false)); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_MKDIR, path, attrs); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_MKDIR, path, attrs); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected void doMakeDirectory(int id, String path, Map<String, ?> attrs, LinkOption... options) throws IOException { @@ -1417,11 +1421,11 @@ public abstract class AbstractSftpSubsystemHelper */ doRemove(id, path, IoUtils.getLinkOptions(false)); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_REMOVE, path); + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_REMOVE, path); return; } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } protected void doRemove(int id, String path, LinkOption... options) throws IOException { @@ -1774,6 +1778,8 @@ public abstract class AbstractSftpSubsystemHelper } protected void sendHandle(Buffer buffer, int id, String handle) throws IOException { + buffer.clear(); + buffer.putInt(0); buffer.putByte((byte) SftpConstants.SSH_FXP_HANDLE); buffer.putInt(id); buffer.putString(handle); @@ -1781,6 +1787,8 @@ public abstract class AbstractSftpSubsystemHelper } protected void sendAttrs(Buffer buffer, int id, Map<String, ?> attributes) throws IOException { + buffer.clear(); + buffer.putInt(0); buffer.putByte((byte) SftpConstants.SSH_FXP_ATTRS); buffer.putInt(id); writeAttrs(buffer, attributes); @@ -1791,6 +1799,9 @@ public abstract class AbstractSftpSubsystemHelper //in case we are running on Windows String unixPath = link.replace(File.separatorChar, '/'); + buffer.clear(); + buffer.putInt(0); + buffer.putByte((byte) SftpConstants.SSH_FXP_NAME); buffer.putInt(id); buffer.putInt(1); // one response @@ -1814,6 +1825,9 @@ public abstract class AbstractSftpSubsystemHelper } protected void sendPath(Buffer buffer, int id, Path f, Map<String, ?> attrs) throws IOException { + buffer.clear(); + buffer.putInt(0); + buffer.putByte((byte) SftpConstants.SSH_FXP_NAME); buffer.putInt(id); buffer.putInt(1); // one reply @@ -2537,11 +2551,15 @@ public abstract class AbstractSftpSubsystemHelper getServerSession(), id, SftpConstants.getStatusName(substatus), lang, msg); } + buffer.clear(); + buffer.putInt(0); + buffer.putByte((byte) SftpConstants.SSH_FXP_STATUS); buffer.putInt(id); buffer.putInt(substatus); buffer.putString(msg); buffer.putString(lang); + send(buffer); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/e05aa13c/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java index 5cfbf01..39ebce2 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java @@ -40,7 +40,6 @@ import java.util.Map; import java.util.Objects; import java.util.TreeMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.sshd.common.Factory; @@ -48,6 +47,8 @@ import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.digest.BuiltinDigests; import org.apache.sshd.common.digest.DigestFactory; import org.apache.sshd.common.file.FileSystemAware; +import org.apache.sshd.common.io.IoInputStream; +import org.apache.sshd.common.io.IoOutputStream; import org.apache.sshd.common.random.Random; import org.apache.sshd.common.subsystem.sftp.SftpConstants; import org.apache.sshd.common.subsystem.sftp.SftpHelper; @@ -61,10 +62,13 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.common.util.io.IoUtils; import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; import org.apache.sshd.common.util.threads.ThreadUtils; -import org.apache.sshd.server.Command; +import org.apache.sshd.server.AsyncCommand; +import org.apache.sshd.server.ChannelSessionAware; import org.apache.sshd.server.Environment; import org.apache.sshd.server.ExitCallback; import org.apache.sshd.server.SessionAware; +import org.apache.sshd.server.channel.ChannelDataReceiver; +import org.apache.sshd.server.channel.ChannelSession; import org.apache.sshd.server.session.ServerSession; /** @@ -74,7 +78,7 @@ import org.apache.sshd.server.session.ServerSession; */ public class SftpSubsystem extends AbstractSftpSubsystemHelper - implements Command, Runnable, SessionAware, FileSystemAware, ExecutorServiceCarrier { + implements AsyncCommand, ChannelDataReceiver, ChannelSessionAware, SessionAware, FileSystemAware, ExecutorServiceCarrier { /** * Properties key for the maximum of available open handles per session. @@ -114,14 +118,13 @@ public class SftpSubsystem public static final int DEFAULT_MAX_READDIR_DATA_SIZE = 16 * 1024; protected ExitCallback callback; - protected InputStream in; - protected OutputStream out; - protected OutputStream err; + protected IoInputStream in; + protected IoOutputStream out; + protected IoOutputStream err; protected Environment env; protected Random randomizer; protected int fileHandleSize = DEFAULT_FILE_HANDLE_SIZE; protected int maxFileHandleRounds = DEFAULT_FILE_HANDLE_ROUNDS; - protected Future<?> pendingFuture; protected byte[] workBuf = new byte[Math.max(DEFAULT_FILE_HANDLE_SIZE, Integer.BYTES)]; protected FileSystem fileSystem = FileSystems.getDefault(); protected Path defaultDir = fileSystem.getPath(System.getProperty("user.dir")); @@ -129,8 +132,10 @@ public class SftpSubsystem protected int version; protected final Map<String, byte[]> extensions = new TreeMap<>(Comparator.naturalOrder()); protected final Map<String, Handle> handles = new HashMap<>(); + private Buffer buffer = new ByteArrayBuffer(1024); private ServerSession serverSession; + private ChannelSession channelSession; private final AtomicBoolean closed = new AtomicBoolean(false); private ExecutorService executorService; private boolean shutdownOnExit; @@ -227,75 +232,86 @@ public class SftpSubsystem @Override public void setInputStream(InputStream in) { - this.in = in; } @Override public void setOutputStream(OutputStream out) { - this.out = out; } @Override public void setErrorStream(OutputStream err) { + } + + @Override + public void setIoInputStream(IoInputStream in) { + this.in = in; + } + + @Override + public void setIoOutputStream(IoOutputStream out) { + this.out = out; + } + + @Override + public void setIoErrorStream(IoOutputStream err) { this.err = err; } @Override public void start(Environment env) throws IOException { this.env = env; - try { - ExecutorService executor = getExecutorService(); - pendingFuture = executor.submit(this); - } catch (RuntimeException e) { // e.g., RejectedExecutionException - log.error("Failed (" + e.getClass().getSimpleName() + ") to start command: " + e.toString(), e); - throw new IOException(e); - } } @Override - public void run() { - try { - for (long count = 1L;; count++) { - int length = BufferUtils.readInt(in, workBuf, 0, workBuf.length); - ValidateUtils.checkTrue(length >= (Integer.BYTES + 1 /* command */), "Bad length to read: %d", length); - - Buffer buffer = new ByteArrayBuffer(length + Integer.BYTES + Long.SIZE /* a bit extra */, false); - buffer.putInt(length); - for (int remainLen = length; remainLen > 0;) { - int l = in.read(buffer.array(), buffer.wpos(), remainLen); - if (l < 0) { - throw new IllegalArgumentException("Premature EOF at buffer #" + count + " while read length=" + length + " and remain=" + remainLen); - } - buffer.wpos(buffer.wpos() + l); - remainLen -= l; - } + public void setChannelSession(ChannelSession session) { + this.channelSession = session; + } - process(buffer); + @Override + public int data(ChannelSession channel, byte[] buf, int start, int len) throws IOException { + buffer.putRawBytes(buf, start, len); + while (buffer.available() >= Integer.BYTES) { + int rpos = buffer.rpos(); + int msglen = buffer.getInt(); + if (buffer.available() >= msglen) { + Buffer b = new ByteArrayBuffer(msglen + Integer.BYTES + Long.SIZE /* a bit extra */, false); + b.putInt(msglen); + b.putRawBytes(buffer.array(), buffer.rpos(), msglen); + buffer.rpos(buffer.rpos() + msglen); + getExecutorService().submit(new Runnable() { + @Override + public void run() { + try { + process(b); + channelSession.getLocalWindow().consumeAndCheck(msglen + Integer.BYTES); + } catch (IOException e) { + serverSession.exceptionCaught(e); + } + } + }); + } else { + buffer.rpos(rpos); + break; } - } catch (Throwable t) { - if ((!closed.get()) && (!(t instanceof EOFException))) { // Ignore - log.error("run({}) {} caught in SFTP subsystem: {}", - getServerSession(), t.getClass().getSimpleName(), t.getMessage()); - if (log.isDebugEnabled()) { - log.debug("run(" + getServerSession() + ") caught exception details", t); + } + return 0; + } + + @Override + public void close() throws IOException { + boolean debugEnabled = log.isDebugEnabled(); + handles.forEach((id, handle) -> { + try { + handle.close(); + if (debugEnabled) { + log.debug("run({}) closed pending handle {} [{}]", getServerSession(), id, handle); } + } catch (IOException ioe) { + log.error("run({}) failed ({}) to close handle={}[{}]: {}", + getServerSession(), ioe.getClass().getSimpleName(), id, handle, ioe.getMessage()); } - } finally { - boolean debugEnabled = log.isDebugEnabled(); - handles.forEach((id, handle) -> { - try { - handle.close(); - if (debugEnabled) { - log.debug("run({}) closed pending handle {} [{}]", getServerSession(), id, handle); - } - } catch (IOException ioe) { - log.error("run({}) failed ({}) to close handle={}[{}]: {}", - getServerSession(), ioe.getClass().getSimpleName(), id, handle, ioe.getMessage()); - } - }); - - callback.onExit(0); - } + }); + callback.onExit(0); } @Override @@ -383,7 +399,7 @@ public class SftpSubsystem String name = SftpConstants.getCommandMessageName(type); log.warn("process({})[length={}, type={}, id={}] unknown command", getServerSession(), length, name, id); - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command " + name + " is unsupported or not implemented"); + sendStatus(buffer, id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command " + name + " is unsupported or not implemented"); } } @@ -428,7 +444,7 @@ public class SftpSubsystem if (log.isDebugEnabled()) { log.debug("executeExtendedCommand({}) received unsupported SSH_FXP_EXTENDED({})", getServerSession(), extension); } - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command SSH_FXP_EXTENDED(" + extension + ") is unsupported or not implemented"); + sendStatus(buffer, id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command SSH_FXP_EXTENDED(" + extension + ") is unsupported or not implemented"); break; } } @@ -602,7 +618,7 @@ public class SftpSubsystem * channel. */ if (requestsCount > 0L) { - sendStatus(BufferUtils.clear(buffer), id, + sendStatus(buffer, id, SftpConstants.SSH_FX_FAILURE, "Version selection not the 1st request for proposal = " + proposed); session.close(true); @@ -619,9 +635,9 @@ public class SftpSubsystem } if (result) { version = Integer.parseInt(proposed); - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, ""); + sendStatus(buffer, id, SftpConstants.SSH_FX_OK, ""); } else { - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_FAILURE, "Unsupported version " + proposed); + sendStatus(buffer, id, SftpConstants.SSH_FX_FAILURE, "Unsupported version " + proposed); session.close(true); } } @@ -752,11 +768,10 @@ public class SftpSubsystem getServerSession(), id, handle, h); } - Buffer reply = null; try { DirectoryHandle dh = validateHandle(handle, h, DirectoryHandle.class); if (dh.isDone()) { - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_EOF, "Directory reading is done"); + sendStatus(buffer, id, SftpConstants.SSH_FX_EOF, "Directory reading is done"); return; } @@ -781,40 +796,37 @@ public class SftpSubsystem // Send only a few files at a time to not create packets of a too // large size or have a timeout to occur. - reply = BufferUtils.clear(buffer); - reply.putByte((byte) SftpConstants.SSH_FXP_NAME); - reply.putInt(id); + buffer.clear(); + buffer.putInt(0); + + buffer.putByte((byte) SftpConstants.SSH_FXP_NAME); + buffer.putInt(id); - int lenPos = reply.wpos(); - reply.putInt(0); + int lenPos = buffer.wpos(); + buffer.putInt(0); ServerSession session = getServerSession(); int maxDataSize = session.getIntProperty(MAX_READDIR_DATA_SIZE_PROP, DEFAULT_MAX_READDIR_DATA_SIZE); - int count = doReadDir(id, handle, dh, reply, maxDataSize, IoUtils.getLinkOptions(false)); - BufferUtils.updateLengthPlaceholder(reply, lenPos, count); + int count = doReadDir(id, handle, dh, buffer, maxDataSize, IoUtils.getLinkOptions(false)); + BufferUtils.updateLengthPlaceholder(buffer, lenPos, count); if ((!dh.isSendDot()) && (!dh.isSendDotDot()) && (!dh.hasNext())) { dh.markDone(); } Boolean indicator = - SftpHelper.indicateEndOfNamesList(reply, getVersion(), session, dh.isDone()); + SftpHelper.indicateEndOfNamesList(buffer, getVersion(), session, dh.isDone()); if (debugEnabled) { log.debug("doReadDir({})({})[{}] - seding {} entries - eol={}", session, handle, h, count, indicator); } + send(buffer); } else { // empty directory dh.markDone(); - sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_EOF, "Empty directory"); - return; + sendStatus(buffer, id, SftpConstants.SSH_FX_EOF, "Empty directory"); } - - Objects.requireNonNull(reply, "No reply buffer created"); } catch (IOException | RuntimeException e) { - sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_READDIR, handle); - return; + sendStatus(buffer, id, e, SftpConstants.SSH_FXP_READDIR, handle); } - - send(reply); } @Override @@ -990,6 +1002,7 @@ public class SftpSubsystem } buffer.clear(); + buffer.putInt(0); buffer.putByte((byte) SftpConstants.SSH_FXP_VERSION); buffer.putInt(version); @@ -1003,10 +1016,8 @@ public class SftpSubsystem @Override protected void send(Buffer buffer) throws IOException { - int len = buffer.available(); - BufferUtils.writeInt(out, len, workBuf, 0, workBuf.length); - out.write(buffer.array(), buffer.rpos(), len); - out.flush(); + BufferUtils.updateLengthPlaceholder(buffer, 0); + out.writePacket(buffer); } @Override @@ -1032,17 +1043,6 @@ public class SftpSubsystem } } - // if thread has not completed, cancel it - if ((pendingFuture != null) && (!pendingFuture.isDone())) { - boolean result = pendingFuture.cancel(true); - // TODO consider waiting some reasonable (?) amount of time for cancellation - if (debugEnabled) { - log.debug("destroy(" + session + ") - cancel pending future=" + result); - } - } - - pendingFuture = null; - ExecutorService executors = getExecutorService(); if ((executors != null) && (!executors.isShutdown()) && isShutdownOnExit()) { Collection<Runnable> runners = executors.shutdownNow();