Repository: mina-sshd
Updated Branches:
  refs/heads/master c5b163f2b -> 154287462


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/15428746/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
 
b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
new file mode 100644
index 0000000..4118ff6
--- /dev/null
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.StreamCorruptedException;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.sshd.client.channel.ChannelSubsystem;
+import org.apache.sshd.client.channel.ClientChannel;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.client.subsystem.sftp.SftpVersionSelector;
+import org.apache.sshd.common.PropertyResolverUtils;
+import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.subsystem.sftp.extensions.ParserUtils;
+import 
org.apache.sshd.common.subsystem.sftp.extensions.VersionsParser.Versions;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.BufferUtils;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+
+/**
+ * @author <a href="mailto:[email protected]";>Apache MINA SSHD Project</a>
+ */
+public class DefaultSftpClient extends AbstractSftpClient {
+    private final ClientSession clientSession;
+    private final ChannelSubsystem channel;
+    private final Map<Integer, Buffer> messages = new HashMap<>();
+    private final AtomicInteger cmdId = new AtomicInteger(100);
+    private final Buffer receiveBuffer = new ByteArrayBuffer();
+    private final byte[] workBuf = new byte[Integer.BYTES];
+    private final AtomicInteger versionHolder = new AtomicInteger(0);
+    private final AtomicBoolean closing = new AtomicBoolean(false);
+    private final NavigableMap<String, byte[]> extensions = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+    private final NavigableMap<String, byte[]> exposedExtensions = 
Collections.unmodifiableNavigableMap(extensions);
+    private Charset nameDecodingCharset = DEFAULT_NAME_DECODING_CHARSET;
+
+    public DefaultSftpClient(ClientSession clientSession) throws IOException {
+        this.nameDecodingCharset = 
PropertyResolverUtils.getCharset(clientSession, NAME_DECODING_CHARSET, 
DEFAULT_NAME_DECODING_CHARSET);
+        this.clientSession = Objects.requireNonNull(clientSession, "No client 
session");
+        this.channel = 
clientSession.createSubsystemChannel(SftpConstants.SFTP_SUBSYSTEM_NAME);
+        this.channel.setOut(new OutputStream() {
+            private final byte[] singleByte = new byte[1];
+            @Override
+            public void write(int b) throws IOException {
+                synchronized (singleByte) {
+                    singleByte[0] = (byte) b;
+                    write(singleByte);
+                }
+            }
+
+            @Override
+            public void write(byte[] b, int off, int len) throws IOException {
+                data(b, off, len);
+            }
+        });
+        this.channel.setErr(new ByteArrayOutputStream(Byte.MAX_VALUE));
+
+        long initializationTimeout = 
clientSession.getLongProperty(SFTP_CHANNEL_OPEN_TIMEOUT, 
DEFAULT_CHANNEL_OPEN_TIMEOUT);
+        this.channel.open().verify(initializationTimeout);
+        this.channel.onClose(() -> {
+            synchronized (messages) {
+                closing.set(true);
+                messages.notifyAll();
+            }
+
+            if (versionHolder.get() <= 0) {
+                log.warn("onClose({}) closed before version negotiated", 
channel);
+            }
+        });
+
+        try {
+            init(initializationTimeout);
+        } catch (IOException | RuntimeException e) {
+            this.channel.close(true);
+            throw e;
+        }
+    }
+
+    @Override
+    public int getVersion() {
+        return versionHolder.get();
+    }
+
+    @Override
+    public ClientSession getClientSession() {
+        return clientSession;
+    }
+
+    @Override
+    public ClientChannel getClientChannel() {
+        return channel;
+    }
+
+    @Override
+    public NavigableMap<String, byte[]> getServerExtensions() {
+        return exposedExtensions;
+    }
+
+    @Override
+    public Charset getNameDecodingCharset() {
+        return nameDecodingCharset;
+    }
+
+    @Override
+    public void setNameDecodingCharset(Charset nameDecodingCharset) {
+        this.nameDecodingCharset = Objects.requireNonNull(nameDecodingCharset, 
"No charset provided");
+    }
+
+    @Override
+    public boolean isClosing() {
+        return closing.get();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return this.channel.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (isOpen()) {
+            this.channel.close(false);
+        }
+    }
+
+    /**
+     * Receive binary data
+     * @param buf   The buffer for the incoming data
+     * @param start Offset in buffer to place the data
+     * @param len   Available space in buffer for the data
+     * @return Actual size of received data
+     * @throws IOException If failed to receive incoming data
+     */
+    protected int data(byte[] buf, int start, int len) throws IOException {
+        Buffer incoming = new ByteArrayBuffer(buf, start, len);
+        // If we already have partial data, we need to append it to the buffer 
and use it
+        if (receiveBuffer.available() > 0) {
+            receiveBuffer.putBuffer(incoming);
+            incoming = receiveBuffer;
+        }
+
+        // Process commands
+        int rpos = incoming.rpos();
+        for (int count = 1; receive(incoming); count++) {
+            if (log.isTraceEnabled()) {
+                log.trace("data({}) Processed {} data messages", 
getClientChannel(), count);
+            }
+        }
+
+        int read = incoming.rpos() - rpos;
+        // Compact and add remaining data
+        receiveBuffer.compact();
+        if (receiveBuffer != incoming && incoming.available() > 0) {
+            receiveBuffer.putBuffer(incoming);
+        }
+
+        return read;
+    }
+
+    /**
+     * Read SFTP packets from buffer
+     *
+     * @param incoming The received {@link Buffer}
+     * @return {@code true} if data from incoming buffer was processed
+     * @throws IOException if failed to process the buffer
+     * @see #process(Buffer)
+     */
+    protected boolean receive(Buffer incoming) throws IOException {
+        int rpos = incoming.rpos();
+        int wpos = incoming.wpos();
+        ClientSession session = getClientSession();
+        session.resetIdleTimeout();
+
+        if ((wpos - rpos) > 4) {
+            int length = incoming.getInt();
+            if (length < 5) {
+                throw new IOException("Illegal sftp packet length: " + length);
+            }
+            if ((wpos - rpos) >= (length + 4)) {
+                incoming.rpos(rpos);
+                incoming.wpos(rpos + 4 + length);
+                process(incoming);
+                incoming.rpos(rpos + 4 + length);
+                incoming.wpos(wpos);
+                return true;
+            }
+        }
+        incoming.rpos(rpos);
+        return false;
+    }
+
+    /**
+     * Process an SFTP packet
+     *
+     * @param incoming The received {@link Buffer}
+     * @throws IOException if failed to process the buffer
+     */
+    protected void process(Buffer incoming) throws IOException {
+        // create a copy of the buffer in case it is being re-used
+        Buffer buffer = new ByteArrayBuffer(incoming.available() + Long.SIZE, 
false);
+        buffer.putBuffer(incoming);
+
+        int rpos = buffer.rpos();
+        int length = buffer.getInt();
+        int type = buffer.getUByte();
+        Integer id = buffer.getInt();
+        buffer.rpos(rpos);
+
+        if (log.isTraceEnabled()) {
+            log.trace("process({}) id={}, type={}, len={}",
+                      getClientChannel(), id, 
SftpConstants.getCommandMessageName(type), length);
+        }
+
+        synchronized (messages) {
+            messages.put(id, buffer);
+            messages.notifyAll();
+        }
+    }
+
+    @Override
+    public int send(int cmd, Buffer buffer) throws IOException {
+        int id = cmdId.incrementAndGet();
+        int len = buffer.available();
+        if (log.isTraceEnabled()) {
+            log.trace("send({}) cmd={}, len={}, id={}",
+                      getClientChannel(), 
SftpConstants.getCommandMessageName(cmd), len, id);
+        }
+
+        OutputStream dos = channel.getInvertedIn();
+        BufferUtils.writeInt(dos, 1 /* cmd */ + Integer.BYTES /* id */ + len, 
workBuf);
+        dos.write(cmd & 0xFF);
+        BufferUtils.writeInt(dos, id, workBuf);
+        dos.write(buffer.array(), buffer.rpos(), len);
+        dos.flush();
+        return id;
+    }
+
+    @Override
+    public Buffer receive(int id) throws IOException {
+        Integer reqId = id;
+        synchronized (messages) {
+            for (int count = 1;; count++) {
+                if (isClosing() || (!isOpen())) {
+                    throw new SshException("Channel is being closed");
+                }
+
+                Buffer buffer = messages.remove(reqId);
+                if (buffer != null) {
+                    return buffer;
+                }
+
+                try {
+                    messages.wait();
+                } catch (InterruptedException e) {
+                    throw (IOException) new 
InterruptedIOException("Interrupted while waiting for messages at iteration #" 
+ count).initCause(e);
+                }
+            }
+        }
+    }
+
+    protected Buffer read() throws IOException {
+        InputStream dis = channel.getInvertedOut();
+        int length = BufferUtils.readInt(dis, workBuf);
+        // must have at least command + length
+        if (length < (1 + Integer.BYTES)) {
+            throw new IllegalArgumentException("Bad length: " + length);
+        }
+
+        Buffer buffer = new ByteArrayBuffer(length + Integer.BYTES, false);
+        buffer.putInt(length);
+        int nb = length;
+        while (nb > 0) {
+            int readLen = dis.read(buffer.array(), buffer.wpos(), nb);
+            if (readLen < 0) {
+                throw new IllegalArgumentException("Premature EOF while read " 
+ length + " bytes - remaining=" + nb);
+            }
+            buffer.wpos(buffer.wpos() + readLen);
+            nb -= readLen;
+        }
+
+        return buffer;
+    }
+
+    protected void init(long initializationTimeout) throws IOException {
+        ValidateUtils.checkTrue(initializationTimeout > 0L, "Invalid 
initialization timeout: %d", initializationTimeout);
+
+        // Send init packet
+        OutputStream dos = channel.getInvertedIn();
+        BufferUtils.writeInt(dos, 5 /* total length */, workBuf);
+        dos.write(SftpConstants.SSH_FXP_INIT);
+        BufferUtils.writeInt(dos, SftpConstants.SFTP_V6, workBuf);
+        dos.flush();
+
+        Buffer buffer;
+        Integer reqId;
+        synchronized (messages) {
+            /*
+             * We need to use a timeout since if the remote server does not 
support
+             * SFTP, we will not know it immediately. This is due to the fact 
that the
+             * request for the subsystem does not contain a reply as to its 
success or
+             * failure. Thus, the SFTP channel is created by the client, but 
there is
+             * no one on the other side to reply - thus the need for the 
timeout
+             */
+            for (long remainingTimeout = initializationTimeout; 
(remainingTimeout > 0L) && messages.isEmpty() && (!isClosing()) && isOpen();) {
+                try {
+                    long sleepStart = System.nanoTime();
+                    messages.wait(remainingTimeout);
+                    long sleepEnd = System.nanoTime();
+                    long sleepDuration = sleepEnd - sleepStart;
+                    long sleepMillis = 
TimeUnit.NANOSECONDS.toMillis(sleepDuration);
+                    if (sleepMillis < 1L) {
+                        remainingTimeout--;
+                    } else {
+                        remainingTimeout -= sleepMillis;
+                    }
+                } catch (InterruptedException e) {
+                    throw (IOException) new 
InterruptedIOException("Interrupted init()").initCause(e);
+                }
+            }
+
+            if (isClosing() || (!isOpen())) {
+                throw new EOFException("Closing while await init message");
+            }
+
+            if (messages.isEmpty()) {
+                throw new SocketTimeoutException("No incoming initialization 
response received within " + initializationTimeout + " msec.");
+            }
+
+            Collection<Integer> ids = messages.keySet();
+            Iterator<Integer> iter = ids.iterator();
+            reqId = iter.next();
+            buffer = messages.remove(reqId);
+        }
+
+        int length = buffer.getInt();
+        int type = buffer.getUByte();
+        int id = buffer.getInt();
+        if (log.isTraceEnabled()) {
+            log.trace("init({}) id={} type={} len={}",
+                      getClientChannel(), id, 
SftpConstants.getCommandMessageName(type), length);
+        }
+
+        if (type == SftpConstants.SSH_FXP_VERSION) {
+            if (id < SftpConstants.SFTP_V3) {
+                throw new SshException("Unsupported sftp version " + id);
+            }
+            versionHolder.set(id);
+
+            if (log.isTraceEnabled()) {
+                log.trace("init({}) version={}", getClientChannel(), 
versionHolder);
+            }
+
+            while (buffer.available() > 0) {
+                String name = buffer.getString();
+                byte[] data = buffer.getBytes();
+                if (log.isTraceEnabled()) {
+                    log.trace("init({}) added extension=", getClientChannel(), 
name);
+                }
+                extensions.put(name, data);
+            }
+        } else if (type == SftpConstants.SSH_FXP_STATUS) {
+            int substatus = buffer.getInt();
+            String msg = buffer.getString();
+            String lang = buffer.getString();
+            if (log.isTraceEnabled()) {
+                log.trace("init({})[id={}] - status: {} [{}] {}",
+                          getClientChannel(), id, 
SftpConstants.getStatusName(substatus), lang, msg);
+            }
+
+            throwStatusException(SftpConstants.SSH_FXP_INIT, id, substatus, 
msg, lang);
+        } else {
+            handleUnexpectedPacket(SftpConstants.SSH_FXP_INIT, 
SftpConstants.SSH_FXP_VERSION, id, type, length, buffer);
+        }
+    }
+
+    /**
+     * @param selector The {@link SftpVersionSelector} to use - ignored if 
{@code null}
+     * @return The selected version (may be same as current)
+     * @throws IOException If failed to negotiate
+     */
+    public int negotiateVersion(SftpVersionSelector selector) throws 
IOException {
+        int current = getVersion();
+        if (selector == null) {
+            return current;
+        }
+
+        Set<Integer> available = 
GenericUtils.asSortedSet(Collections.singleton(current));
+        Map<String, ?> parsed = getParsedServerExtensions();
+        Collection<String> extensions = 
ParserUtils.supportedExtensions(parsed);
+        if ((GenericUtils.size(extensions) > 0) && 
extensions.contains(SftpConstants.EXT_VERSION_SELECT)) {
+            Versions vers = GenericUtils.isEmpty(parsed) ? null : (Versions) 
parsed.get(SftpConstants.EXT_VERSIONS);
+            Collection<String> reported = (vers == null) ? null : 
vers.getVersions();
+            if (GenericUtils.size(reported) > 0) {
+                for (String v : reported) {
+                    if (!available.add(Integer.valueOf(v))) {
+                        continue;   // debug breakpoint
+                    }
+                }
+            }
+        }
+
+        int selected = selector.selectVersion(getClientSession(), current, new 
ArrayList<>(available));
+        if (log.isDebugEnabled()) {
+            log.debug("negotiateVersion({}) current={} {} -> {}", 
getClientChannel(), current, available, selected);
+        }
+
+        if (selected == current) {
+            return current;
+        }
+
+        if (!available.contains(selected)) {
+            throw new StreamCorruptedException("Selected version (" + selected 
+ ") not part of available: " + available);
+        }
+
+        String verVal = String.valueOf(selected);
+        Buffer buffer = new ByteArrayBuffer(Integer.BYTES + 
SftpConstants.EXT_VERSION_SELECT.length()     // extension name
+                + Integer.BYTES + verVal.length() + Byte.SIZE, false);
+        buffer.putString(SftpConstants.EXT_VERSION_SELECT);
+        buffer.putString(verVal);
+        checkCommandStatus(SftpConstants.SSH_FXP_EXTENDED, buffer);
+        versionHolder.set(selected);
+        return selected;
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/15428746/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClientFactory.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClientFactory.java
 
b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClientFactory.java
new file mode 100644
index 0000000..c6702f8
--- /dev/null
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClientFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.IOException;
+
+import org.apache.sshd.client.ClientFactoryManager;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClientFactory;
+import org.apache.sshd.client.subsystem.sftp.SftpFileSystem;
+import org.apache.sshd.client.subsystem.sftp.SftpFileSystemProvider;
+import org.apache.sshd.client.subsystem.sftp.SftpVersionSelector;
+import org.apache.sshd.common.util.logging.AbstractLoggingBean;
+
+/**
+ * TODO Add javadoc
+ *
+ * @author <a href="mailto:[email protected]";>Apache MINA SSHD Project</a>
+ */
+public class DefaultSftpClientFactory extends AbstractLoggingBean implements 
SftpClientFactory {
+    public static final DefaultSftpClientFactory INSTANCE = new 
DefaultSftpClientFactory();
+
+    public DefaultSftpClientFactory() {
+        super();
+    }
+
+    @Override
+    public SftpClient createSftpClient(ClientSession session, 
SftpVersionSelector selector) throws IOException {
+        DefaultSftpClient client = createDefaultSftpClient(session, selector);
+        try {
+            client.negotiateVersion(selector);
+        } catch (IOException | RuntimeException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("createSftpClient({}) failed ({}) to negotiate 
version: {}",
+                          session, e.getClass().getSimpleName(), 
e.getMessage());
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("createSftpClient(" + session + ") version 
negotiation failure details", e);
+            }
+
+            client.close();
+            throw e;
+        }
+
+        return client;
+    }
+
+    protected DefaultSftpClient createDefaultSftpClient(ClientSession session, 
SftpVersionSelector selector) throws IOException {
+        return new DefaultSftpClient(session);
+    }
+
+    @Override
+    public SftpFileSystem createSftpFileSystem(
+            ClientSession session, SftpVersionSelector selector, int 
readBufferSize, int writeBufferSize)
+                throws IOException {
+        ClientFactoryManager manager = session.getFactoryManager();
+        SftpFileSystemProvider provider = new 
SftpFileSystemProvider((SshClient) manager, selector);
+        SftpFileSystem fs = provider.newFileSystem(session);
+        fs.setReadBufferSize(readBufferSize);
+        fs.setWriteBufferSize(writeBufferSize);
+        return fs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/15428746/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/DefaultCloseableHandleTest.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/DefaultCloseableHandleTest.java
 
b/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/DefaultCloseableHandleTest.java
index eee30c5..35c325b 100644
--- 
a/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/DefaultCloseableHandleTest.java
+++ 
b/sshd-core/src/test/java/org/apache/sshd/client/subsystem/sftp/DefaultCloseableHandleTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
 import org.apache.sshd.client.subsystem.sftp.SftpClient.Handle;
+import org.apache.sshd.client.subsystem.sftp.impl.DefaultCloseableHandle;
 import org.apache.sshd.util.test.BaseTestSupport;
 import org.junit.FixMethodOrder;
 import org.junit.Test;

Reply via email to