lgoldstein commented on a change in pull request #123:
URL: https://github.com/apache/mina-sshd/pull/123#discussion_r411387849
##########
File path:
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
##########
@@ -107,9 +108,39 @@ protected synchronized void doWriteIfPossible(boolean
resume) {
if (total > 0) {
Channel channel = getChannel();
Window remoteWindow = channel.getRemoteWindow();
- long length = Math.min(Math.min(remoteWindow.getSize(), total),
remoteWindow.getPacketSize());
- if (log.isTraceEnabled()) {
- log.trace("doWriteIfPossible({})[resume={}] attempting to
write {} out of {}", this, resume, length, total);
+ long length;
+ if (remoteWindow.getSize() < total && total <=
remoteWindow.getPacketSize()) {
+ // do not chunk when the window is smaller than the packet size
+ length = 0;
+ // do a defensive copy in case the user reuses the buffer
+ IoWriteFutureImpl f = new IoWriteFutureImpl(future.getId(),
new ByteArrayBuffer(buffer.getCompactData()));
+ f.addListener(w -> future.setValue(w.getException() != null ?
w.getException() : w.isWritten()));
+ pendingWrite.set(f);
+ if (log.isTraceEnabled()) {
+ log.trace("doWriteIfPossible({})[resume={}] waiting for
window space {}", this, resume,
+ remoteWindow.getSize());
+ }
+ } else if (total > remoteWindow.getPacketSize()) {
+ if (buffer.rpos() > 0) {
+ // do a defensive copy in case the user reuses the buffer
Review comment:
This is a lot of code in a single method - can we somehow separate each
case to a (protected) method - easier to read, debug, maintain...
##########
File path:
sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
##########
@@ -954,7 +954,9 @@ protected IoWriteFuture doWritePacket(Buffer buffer) throws
IOException {
synchronized (encodeLock) {
Buffer packet = resolveOutputPacket(buffer);
IoSession networkSession = getIoSession();
- return networkSession.writePacket(packet);
+ IoWriteFuture future = networkSession.writePacket(packet);
+ buffer.rpos(buffer.wpos());
+ return future;
Review comment:
Doesn't this change the `rpos` while the session is writing the buffer ?
Feels like this should be done only **after** packet has been written
##########
File path: sshd-sftp/pom.xml
##########
@@ -85,6 +85,18 @@
<artifactId>jzlib</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>1.12.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>toxiproxy</artifactId>
+ <version>1.12.5</version>
+ <scope>test</scope>
+ </dependency>
Review comment:
If these 2 dependencies are always expected to have the same version
let's use a property (e.g., `${testcontainers.version}` - this way if we
upgrade one we will not forget to update the other (D.R.Y. principle)
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Objects;
+
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.OutputStreamWithChannel;
+
+/**
+ * Implements an output stream for a given remote file
+ *
+ * @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a>
+ */
+public class SftpOutputStreamAsync extends OutputStreamWithChannel {
+
+ static class Ack {
+ int id;
+ long offset;
+ int length;
Review comment:
Should be public + final
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpHelper;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.InputStreamWithChannel;
+
+public class SftpInputStreamAsync extends InputStreamWithChannel {
+
+ static class Ack {
+ int id;
+ long offset;
+ int length;
+
+ Ack(int id, long offset, int length) {
+ this.id = id;
+ this.offset = offset;
+ this.length = length;
+ }
+ }
+
+ private final AbstractSftpClient client;
+ private final String path;
+ private final byte[] bb = new byte[1];
+ private final int bufferSize;
+ private final long fileSize;
+ private Buffer buffer;
+ private CloseableHandle handle;
+ private long requestOffset;
+ private long clientOffset;
+ private final Deque<Ack> pendingReads = new LinkedList<>();
+ private boolean eofIndicator;
+
+ public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
+ String path, Collection<OpenMode> mode) throws
IOException {
+ this.client = Objects.requireNonNull(client, "No SFTP client
instance");
+ this.path = path;
+ this.handle = client.open(path, mode);
+ this.bufferSize = bufferSize;
+ this.fileSize = client.stat(handle).getSize();
+ }
+
+ public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
long clientOffset, long fileSize,
+ String path, CloseableHandle handle) {
+ this.client = Objects.requireNonNull(client, "No SFTP client
instance");
+ this.path = path;
+ this.handle = handle;
+ this.bufferSize = bufferSize;
+ this.clientOffset = clientOffset;
+ this.fileSize = fileSize;
+ }
+
+ /**
+ * The client instance
+ *
+ * @return {@link SftpClient} instance used to access the remote file
+ */
+ public final AbstractSftpClient getClient() {
+ return client;
+ }
+
+ /**
+ * The remotely accessed file path
+ *
+ * @return Remote file path
+ */
+ public final String getPath() {
+ return path;
+ }
+
+ /**
+ * Check if the stream is at EOF
+ *
+ * @return <code>true</code> if all the data has been consumer
+ */
+ public boolean isEof() {
+ return eofIndicator && hasNoData();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return (handle != null) && handle.isOpen();
+ }
+
+ @Override
+ public int read() throws IOException {
+ int read = read(bb, 0, 1);
+ if (read > 0) {
+ return bb[0] & 0xFF;
+ }
+ return read;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("read(" + getPath() + ") stream closed");
+ }
+ int idx = off;
+ while (len > 0 && !eofIndicator) {
+ if (hasNoData()) {
+ fillData();
+ if (eofIndicator && (hasNoData())) {
+ break;
+ }
+ sendRequests();
+ } else {
+ int nb = Math.min(buffer.available(), len);
+ buffer.getRawBytes(b, off, nb);
+ idx += nb;
+ len -= nb;
+ clientOffset += nb;
+ }
+ }
+ int res = idx - off;
+ if (res == 0 && eofIndicator) {
+ res = -1;
+ }
+ return res;
+ }
+
+ public long transferTo(long max, WritableByteChannel out) throws
IOException {
+ if (!isOpen()) {
+ throw new IOException("transferTo(" + getPath() + ") stream
closed");
+ }
+ long orgOffset = clientOffset;
+ while (!eofIndicator && max > 0) {
+ if (hasNoData()) {
+ fillData();
+ if (eofIndicator && hasNoData()) {
+ break;
+ }
+ sendRequests();
+ } else {
+ int nb = buffer.available();
+ int toRead = (int) Math.min(nb, max);
+ ByteBuffer bb = ByteBuffer.wrap(buffer.array(), buffer.rpos(),
toRead);
+ while (bb.hasRemaining()) {
+ out.write(bb);
+ }
+ buffer.rpos(buffer.rpos() + toRead);
+ clientOffset += toRead;
+ max -= toRead;
+ }
+ }
+ return clientOffset - orgOffset;
+ }
+
+ @SuppressWarnings("PMD.MissingOverride")
+ public long transferTo(OutputStream out) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("transferTo(" + getPath() + ") stream
closed");
+ }
+ long orgOffset = clientOffset;
+ while (!eofIndicator) {
+ if (hasNoData()) {
+ fillData();
+ if (eofIndicator && hasNoData()) {
+ break;
+ }
+ sendRequests();
+ } else {
+ int nb = buffer.available();
+ out.write(buffer.array(), buffer.rpos(), nb);
+ buffer.rpos(buffer.rpos() + nb);
+ clientOffset += nb;
+ }
+ }
+ return clientOffset - orgOffset;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("skip(" + getPath() + ") stream closed");
+ }
+ if (clientOffset == 0 && pendingReads.isEmpty()) {
+ clientOffset = n;
+ return n;
+ }
+ return super.skip(n);
+ }
+
+ boolean hasNoData() {
+ return buffer == null || buffer.available() == 0;
+ }
+
+ void sendRequests() throws IOException {
Review comment:
Let's make it protected in accordance with our policy to allow maximum
flexibility to users who wish to modify/enhance the behavior of the relevant
class
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
##########
@@ -783,10 +786,6 @@ public void rename(String oldPath, String newPath,
Collection<CopyMode> options)
@Override
public int read(Handle handle, long fileOffset, byte[] dst, int dstOffset,
int len, AtomicReference<Boolean> eofSignalled)
throws IOException {
- if (eofSignalled != null) {
- eofSignalled.set(null);
- }
-
Review comment:
I don't understand why these lines were removed - the idea is that
before we do anything we initialize the signal to "unknown" state.
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpHelper;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.InputStreamWithChannel;
+
+public class SftpInputStreamAsync extends InputStreamWithChannel {
+
+ static class Ack {
+ int id;
+ long offset;
+ int length;
+
+ Ack(int id, long offset, int length) {
+ this.id = id;
+ this.offset = offset;
+ this.length = length;
+ }
+ }
+
+ private final AbstractSftpClient client;
+ private final String path;
+ private final byte[] bb = new byte[1];
+ private final int bufferSize;
+ private final long fileSize;
+ private Buffer buffer;
+ private CloseableHandle handle;
+ private long requestOffset;
+ private long clientOffset;
+ private final Deque<Ack> pendingReads = new LinkedList<>();
+ private boolean eofIndicator;
+
+ public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
+ String path, Collection<OpenMode> mode) throws
IOException {
+ this.client = Objects.requireNonNull(client, "No SFTP client
instance");
+ this.path = path;
+ this.handle = client.open(path, mode);
+ this.bufferSize = bufferSize;
+ this.fileSize = client.stat(handle).getSize();
+ }
+
+ public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
long clientOffset, long fileSize,
+ String path, CloseableHandle handle) {
+ this.client = Objects.requireNonNull(client, "No SFTP client
instance");
+ this.path = path;
+ this.handle = handle;
+ this.bufferSize = bufferSize;
+ this.clientOffset = clientOffset;
+ this.fileSize = fileSize;
+ }
+
+ /**
+ * The client instance
+ *
+ * @return {@link SftpClient} instance used to access the remote file
+ */
+ public final AbstractSftpClient getClient() {
+ return client;
+ }
+
+ /**
+ * The remotely accessed file path
+ *
+ * @return Remote file path
+ */
+ public final String getPath() {
+ return path;
+ }
+
+ /**
+ * Check if the stream is at EOF
+ *
+ * @return <code>true</code> if all the data has been consumer
+ */
+ public boolean isEof() {
+ return eofIndicator && hasNoData();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return (handle != null) && handle.isOpen();
+ }
+
+ @Override
+ public int read() throws IOException {
+ int read = read(bb, 0, 1);
+ if (read > 0) {
+ return bb[0] & 0xFF;
+ }
+ return read;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("read(" + getPath() + ") stream closed");
+ }
+ int idx = off;
+ while (len > 0 && !eofIndicator) {
+ if (hasNoData()) {
+ fillData();
+ if (eofIndicator && (hasNoData())) {
+ break;
+ }
+ sendRequests();
+ } else {
+ int nb = Math.min(buffer.available(), len);
+ buffer.getRawBytes(b, off, nb);
+ idx += nb;
+ len -= nb;
+ clientOffset += nb;
+ }
+ }
+ int res = idx - off;
+ if (res == 0 && eofIndicator) {
+ res = -1;
+ }
+ return res;
+ }
+
+ public long transferTo(long max, WritableByteChannel out) throws
IOException {
+ if (!isOpen()) {
+ throw new IOException("transferTo(" + getPath() + ") stream
closed");
+ }
+ long orgOffset = clientOffset;
+ while (!eofIndicator && max > 0) {
+ if (hasNoData()) {
+ fillData();
+ if (eofIndicator && hasNoData()) {
+ break;
+ }
+ sendRequests();
+ } else {
+ int nb = buffer.available();
+ int toRead = (int) Math.min(nb, max);
+ ByteBuffer bb = ByteBuffer.wrap(buffer.array(), buffer.rpos(),
toRead);
+ while (bb.hasRemaining()) {
+ out.write(bb);
+ }
+ buffer.rpos(buffer.rpos() + toRead);
+ clientOffset += toRead;
+ max -= toRead;
+ }
+ }
+ return clientOffset - orgOffset;
+ }
+
+ @SuppressWarnings("PMD.MissingOverride")
+ public long transferTo(OutputStream out) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("transferTo(" + getPath() + ") stream
closed");
+ }
+ long orgOffset = clientOffset;
+ while (!eofIndicator) {
+ if (hasNoData()) {
+ fillData();
+ if (eofIndicator && hasNoData()) {
+ break;
+ }
+ sendRequests();
+ } else {
+ int nb = buffer.available();
+ out.write(buffer.array(), buffer.rpos(), nb);
+ buffer.rpos(buffer.rpos() + nb);
+ clientOffset += nb;
+ }
+ }
+ return clientOffset - orgOffset;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("skip(" + getPath() + ") stream closed");
+ }
+ if (clientOffset == 0 && pendingReads.isEmpty()) {
+ clientOffset = n;
+ return n;
+ }
+ return super.skip(n);
+ }
+
+ boolean hasNoData() {
Review comment:
Let's make it protected in accordance with our policy to allow maximum
flexibility to users who wish to modify/enhance the behavior of the relevant
class
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpHelper;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.InputStreamWithChannel;
+
+public class SftpInputStreamAsync extends InputStreamWithChannel {
+
+ static class Ack {
+ int id;
+ long offset;
+ int length;
+
+ Ack(int id, long offset, int length) {
+ this.id = id;
+ this.offset = offset;
+ this.length = length;
+ }
+ }
+
+ private final AbstractSftpClient client;
+ private final String path;
+ private final byte[] bb = new byte[1];
+ private final int bufferSize;
+ private final long fileSize;
+ private Buffer buffer;
+ private CloseableHandle handle;
+ private long requestOffset;
+ private long clientOffset;
+ private final Deque<Ack> pendingReads = new LinkedList<>();
+ private boolean eofIndicator;
+
+ public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
+ String path, Collection<OpenMode> mode) throws
IOException {
+ this.client = Objects.requireNonNull(client, "No SFTP client
instance");
+ this.path = path;
+ this.handle = client.open(path, mode);
+ this.bufferSize = bufferSize;
+ this.fileSize = client.stat(handle).getSize();
+ }
+
+ public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
long clientOffset, long fileSize,
+ String path, CloseableHandle handle) {
+ this.client = Objects.requireNonNull(client, "No SFTP client
instance");
+ this.path = path;
+ this.handle = handle;
+ this.bufferSize = bufferSize;
+ this.clientOffset = clientOffset;
+ this.fileSize = fileSize;
+ }
+
+ /**
+ * The client instance
+ *
+ * @return {@link SftpClient} instance used to access the remote file
+ */
+ public final AbstractSftpClient getClient() {
+ return client;
+ }
+
+ /**
+ * The remotely accessed file path
+ *
+ * @return Remote file path
+ */
+ public final String getPath() {
+ return path;
+ }
+
+ /**
+ * Check if the stream is at EOF
+ *
+ * @return <code>true</code> if all the data has been consumer
+ */
+ public boolean isEof() {
+ return eofIndicator && hasNoData();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return (handle != null) && handle.isOpen();
+ }
+
+ @Override
+ public int read() throws IOException {
+ int read = read(bb, 0, 1);
+ if (read > 0) {
+ return bb[0] & 0xFF;
+ }
+ return read;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("read(" + getPath() + ") stream closed");
+ }
+ int idx = off;
+ while (len > 0 && !eofIndicator) {
+ if (hasNoData()) {
+ fillData();
+ if (eofIndicator && (hasNoData())) {
+ break;
+ }
+ sendRequests();
+ } else {
+ int nb = Math.min(buffer.available(), len);
+ buffer.getRawBytes(b, off, nb);
+ idx += nb;
+ len -= nb;
+ clientOffset += nb;
+ }
+ }
+ int res = idx - off;
+ if (res == 0 && eofIndicator) {
+ res = -1;
+ }
+ return res;
+ }
+
+ public long transferTo(long max, WritableByteChannel out) throws
IOException {
+ if (!isOpen()) {
+ throw new IOException("transferTo(" + getPath() + ") stream
closed");
+ }
+ long orgOffset = clientOffset;
+ while (!eofIndicator && max > 0) {
+ if (hasNoData()) {
+ fillData();
+ if (eofIndicator && hasNoData()) {
+ break;
+ }
+ sendRequests();
+ } else {
+ int nb = buffer.available();
+ int toRead = (int) Math.min(nb, max);
+ ByteBuffer bb = ByteBuffer.wrap(buffer.array(), buffer.rpos(),
toRead);
+ while (bb.hasRemaining()) {
+ out.write(bb);
+ }
+ buffer.rpos(buffer.rpos() + toRead);
+ clientOffset += toRead;
+ max -= toRead;
+ }
+ }
+ return clientOffset - orgOffset;
+ }
+
+ @SuppressWarnings("PMD.MissingOverride")
+ public long transferTo(OutputStream out) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("transferTo(" + getPath() + ") stream
closed");
+ }
+ long orgOffset = clientOffset;
+ while (!eofIndicator) {
+ if (hasNoData()) {
+ fillData();
+ if (eofIndicator && hasNoData()) {
+ break;
+ }
+ sendRequests();
+ } else {
+ int nb = buffer.available();
+ out.write(buffer.array(), buffer.rpos(), nb);
+ buffer.rpos(buffer.rpos() + nb);
+ clientOffset += nb;
+ }
+ }
+ return clientOffset - orgOffset;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("skip(" + getPath() + ") stream closed");
+ }
+ if (clientOffset == 0 && pendingReads.isEmpty()) {
+ clientOffset = n;
+ return n;
+ }
+ return super.skip(n);
+ }
+
+ boolean hasNoData() {
+ return buffer == null || buffer.available() == 0;
+ }
+
+ void sendRequests() throws IOException {
+ if (!eofIndicator) {
+ long windowSize =
client.getChannel().getLocalWindow().getMaxSize();
+ while (pendingReads.size() < (int) (windowSize / bufferSize) &&
requestOffset < fileSize + bufferSize
+ || pendingReads.isEmpty()) {
+ Buffer buf =
client.getSession().createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA,
+ 23 /* sftp packet */ + 16 +
handle.getIdentifier().length);
+ buf.rpos(23);
+ buf.wpos(23);
+ buf.putBytes(handle.getIdentifier());
+ buf.putLong(requestOffset);
+ buf.putInt(bufferSize);
+ int reqId = client.send(SftpConstants.SSH_FXP_READ, buf);
+ pendingReads.add(new Ack(reqId, requestOffset, bufferSize));
+ requestOffset += bufferSize;
+ }
+ }
+ }
+
+ void fillData() throws IOException {
Review comment:
Let's make it protected in accordance with our policy to allow maximum
flexibility to users who wish to modify/enhance the behavior of the relevant
class
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
##########
@@ -78,24 +79,8 @@ public DefaultSftpClient(ClientSession clientSession) throws
IOException {
this.nameDecodingCharset = PropertyResolverUtils.getCharset(
clientSession, NAME_DECODING_CHARSET,
DEFAULT_NAME_DECODING_CHARSET);
this.clientSession = Objects.requireNonNull(clientSession, "No client
session");
- this.channel =
clientSession.createSubsystemChannel(SftpConstants.SFTP_SUBSYSTEM_NAME);
- this.channel.setOut(new OutputStream() {
- private final byte[] singleByte = new byte[1];
-
- @Override
- public void write(int b) throws IOException {
- synchronized (singleByte) {
- singleByte[0] = (byte) b;
- write(singleByte);
- }
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- data(b, off, len);
- }
- });
- this.channel.setErr(new ByteArrayOutputStream(Byte.MAX_VALUE));
+ this.channel = new SftpChannelSubsystem();
Review comment:
I think the client session should be invoked to create the channel - in
case the user provided some override. If some special flag is required to
inform that we need some special functionality then let's pass it to
`createSubsystemChannel` but let's not break the convention that sessions
create channels via methods that users can override.
This also ties in with something that I am considering to do - introduce a
pluggable `ChannelFactory` so that users can override channels behaviors if
needed more easily.
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
##########
@@ -346,30 +355,19 @@ public long transferTo(long position, long count,
WritableByteChannel target) th
}
boolean completed = false;
- boolean eof = false;
- long curPos = position;
- int bufSize = (int) Math.min(count, copySize);
- byte[] buffer = new byte[bufSize];
- long totalRead = 0L;
+ boolean eof;
+ long totalRead;
synchronized (lock) {
try {
beginBlocking("transferTo");
- while (totalRead < count && !eof) {
- int read = sftp.read(handle, curPos, buffer, 0,
- (int) Math.min(count - totalRead, buffer.length));
- if (read > 0) {
- ByteBuffer wrap = ByteBuffer.wrap(buffer, 0, read);
- while (wrap.remaining() > 0) {
- target.write(wrap);
- }
- curPos += read;
- totalRead += read;
- } else {
- eof = read == -1;
- }
- }
+ SftpInputStreamAsync input = new SftpInputStreamAsync(
+ (AbstractSftpClient) sftp,
+ copySize, position, count, getRemotePath(), handle);
Review comment:
Shouldn't this be protected by a try-with-resource block (or similar)
that closes it automatically when no longer needed (or exception thrown) ? I
see the comment that says not to close - but what if an exception is thrown ?
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpHelper;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.InputStreamWithChannel;
+
+public class SftpInputStreamAsync extends InputStreamWithChannel {
+
+ static class Ack {
+ int id;
+ long offset;
+ int length;
+
+ Ack(int id, long offset, int length) {
+ this.id = id;
+ this.offset = offset;
+ this.length = length;
+ }
+ }
+
+ private final AbstractSftpClient client;
+ private final String path;
+ private final byte[] bb = new byte[1];
+ private final int bufferSize;
+ private final long fileSize;
+ private Buffer buffer;
+ private CloseableHandle handle;
+ private long requestOffset;
+ private long clientOffset;
+ private final Deque<Ack> pendingReads = new LinkedList<>();
+ private boolean eofIndicator;
+
+ public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
+ String path, Collection<OpenMode> mode) throws
IOException {
+ this.client = Objects.requireNonNull(client, "No SFTP client
instance");
+ this.path = path;
+ this.handle = client.open(path, mode);
+ this.bufferSize = bufferSize;
+ this.fileSize = client.stat(handle).getSize();
+ }
+
+ public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
long clientOffset, long fileSize,
+ String path, CloseableHandle handle) {
+ this.client = Objects.requireNonNull(client, "No SFTP client
instance");
+ this.path = path;
+ this.handle = handle;
+ this.bufferSize = bufferSize;
+ this.clientOffset = clientOffset;
+ this.fileSize = fileSize;
+ }
+
+ /**
+ * The client instance
+ *
+ * @return {@link SftpClient} instance used to access the remote file
+ */
+ public final AbstractSftpClient getClient() {
+ return client;
+ }
+
+ /**
+ * The remotely accessed file path
+ *
+ * @return Remote file path
+ */
+ public final String getPath() {
+ return path;
+ }
+
+ /**
+ * Check if the stream is at EOF
+ *
+ * @return <code>true</code> if all the data has been consumer
+ */
+ public boolean isEof() {
+ return eofIndicator && hasNoData();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return (handle != null) && handle.isOpen();
+ }
+
+ @Override
+ public int read() throws IOException {
+ int read = read(bb, 0, 1);
+ if (read > 0) {
+ return bb[0] & 0xFF;
+ }
+ return read;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("read(" + getPath() + ") stream closed");
+ }
+ int idx = off;
+ while (len > 0 && !eofIndicator) {
+ if (hasNoData()) {
+ fillData();
+ if (eofIndicator && (hasNoData())) {
+ break;
+ }
+ sendRequests();
+ } else {
+ int nb = Math.min(buffer.available(), len);
+ buffer.getRawBytes(b, off, nb);
+ idx += nb;
+ len -= nb;
+ clientOffset += nb;
+ }
+ }
+ int res = idx - off;
+ if (res == 0 && eofIndicator) {
+ res = -1;
+ }
+ return res;
+ }
+
+ public long transferTo(long max, WritableByteChannel out) throws
IOException {
+ if (!isOpen()) {
+ throw new IOException("transferTo(" + getPath() + ") stream
closed");
+ }
+ long orgOffset = clientOffset;
+ while (!eofIndicator && max > 0) {
+ if (hasNoData()) {
+ fillData();
+ if (eofIndicator && hasNoData()) {
+ break;
+ }
+ sendRequests();
+ } else {
+ int nb = buffer.available();
+ int toRead = (int) Math.min(nb, max);
+ ByteBuffer bb = ByteBuffer.wrap(buffer.array(), buffer.rpos(),
toRead);
+ while (bb.hasRemaining()) {
+ out.write(bb);
+ }
+ buffer.rpos(buffer.rpos() + toRead);
+ clientOffset += toRead;
+ max -= toRead;
+ }
+ }
+ return clientOffset - orgOffset;
+ }
+
+ @SuppressWarnings("PMD.MissingOverride")
+ public long transferTo(OutputStream out) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("transferTo(" + getPath() + ") stream
closed");
+ }
+ long orgOffset = clientOffset;
+ while (!eofIndicator) {
+ if (hasNoData()) {
+ fillData();
+ if (eofIndicator && hasNoData()) {
+ break;
+ }
+ sendRequests();
+ } else {
+ int nb = buffer.available();
+ out.write(buffer.array(), buffer.rpos(), nb);
+ buffer.rpos(buffer.rpos() + nb);
+ clientOffset += nb;
+ }
+ }
+ return clientOffset - orgOffset;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("skip(" + getPath() + ") stream closed");
+ }
+ if (clientOffset == 0 && pendingReads.isEmpty()) {
+ clientOffset = n;
+ return n;
+ }
+ return super.skip(n);
+ }
+
+ boolean hasNoData() {
+ return buffer == null || buffer.available() == 0;
+ }
+
+ void sendRequests() throws IOException {
+ if (!eofIndicator) {
+ long windowSize =
client.getChannel().getLocalWindow().getMaxSize();
+ while (pendingReads.size() < (int) (windowSize / bufferSize) &&
requestOffset < fileSize + bufferSize
+ || pendingReads.isEmpty()) {
+ Buffer buf =
client.getSession().createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA,
+ 23 /* sftp packet */ + 16 +
handle.getIdentifier().length);
+ buf.rpos(23);
+ buf.wpos(23);
+ buf.putBytes(handle.getIdentifier());
+ buf.putLong(requestOffset);
+ buf.putInt(bufferSize);
+ int reqId = client.send(SftpConstants.SSH_FXP_READ, buf);
+ pendingReads.add(new Ack(reqId, requestOffset, bufferSize));
+ requestOffset += bufferSize;
+ }
+ }
+ }
+
+ void fillData() throws IOException {
+ Ack ack = pendingReads.pollFirst();
+ if (ack != null) {
+ pollBuffer(ack);
+ if (!eofIndicator && clientOffset < ack.offset) {
+ // we are actually missing some data
+ // so request is synchronously
+ byte[] data = new byte[(int) (ack.offset - clientOffset +
buffer.available())];
+ int cur = 0;
+ int nb = (int) (ack.offset - clientOffset);
+ AtomicReference<Boolean> eof = new AtomicReference<>();
+ while (cur < nb) {
+ int dlen = client.read(handle, clientOffset, data, cur, nb
- cur, eof);
+ eofIndicator = dlen < 0 || eof.get() != null && eof.get();
+ cur += dlen;
+ }
+ buffer.getRawBytes(data, nb, buffer.available());
+ buffer = new ByteArrayBuffer(data);
+ }
+ }
+ }
+
+ void pollBuffer(Ack ack) throws IOException {
Review comment:
Let's make it protected in accordance with our policy to allow maximum
flexibility to users who wish to modify/enhance the behavior of the relevant
class
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
##########
@@ -1278,4 +1274,95 @@ public void unlock(Handle handle, long offset, long
length) throws IOException {
buffer.putLong(length);
checkCommandStatus(SftpConstants.SSH_FXP_UNBLOCK, buffer);
}
+
+ /**
+ * @param path The remote directory path
+ * @return An {@link Iterable} that can be used to iterate
over all the directory entries (unlike
+ * {@link #readDir(Handle)})
+ * @throws IOException If failed to access the remote site
+ * @see #readDir(Handle)
+ */
Review comment:
I don't think we should repeat the javadoc comment of an overridden
method - unless there is something extra to say that does not appear in the
original comment
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
##########
@@ -230,6 +227,18 @@ public long write(ByteBuffer[] srcs, int offset, int
length) throws IOException
return doWrite(buffers, -1L);
}
+ static class Ack {
+ int id;
+ long offset;
+ int length;
+
+ Ack(int id, long offset, int length) {
+ this.id = id;
+ this.offset = offset;
+ this.length = length;
+ }
+ }
Review comment:
See previous remarks regarding this identical class defined in other
locations
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
##########
@@ -410,18 +408,23 @@ public long transferFrom(ReadableByteChannel src, long
position, long count) thr
try {
beginBlocking("transferFrom");
+ SftpOutputStreamAsync output = new SftpOutputStreamAsync(
+ (AbstractSftpClient) sftp,
+ copySize, getRemotePath(), handle);
Review comment:
Shouldn't this be protected by a try-with-resource block (or similar)
that closes it automatically when no longer needed (or exception thrown) ? I
see the comment that says not to close - but what if an exception is thrown ?
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
##########
@@ -1278,4 +1274,95 @@ public void unlock(Handle handle, long offset, long
length) throws IOException {
buffer.putLong(length);
checkCommandStatus(SftpConstants.SSH_FXP_UNBLOCK, buffer);
}
+
+ /**
+ * @param path The remote directory path
+ * @return An {@link Iterable} that can be used to iterate
over all the directory entries (unlike
+ * {@link #readDir(Handle)})
+ * @throws IOException If failed to access the remote site
+ * @see #readDir(Handle)
+ */
+ @Override
+ public Iterable<DirEntry> readDir(String path) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("readDir(" + path + ") client is closed");
+ }
+
+ return new SftpIterableDirEntry(this, path);
+ }
+
+ /**
+ * @param handle A directory {@link Handle}
+ * @return An {@link Iterable} that can be used to iterate
over all the directory entries (like
+ * {@link #readDir(String)}). <B>Note:</B> the
iterable instance is not re-usable - i.e., files
+ * can be iterated only <U>once</U>
+ * @throws IOException If failed to access the directory
+ */
+ @Override
+ public Iterable<DirEntry> listDir(Handle handle) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("listDir(" + handle + ") client is closed");
+ }
+
+ return new StfpIterableDirHandle(this, handle);
+ }
+
+ /**
+ * Opens an {@link FileChannel} on the specified remote path
+ *
+ * @param path The remote path
+ * @param modes The access mode(s) - if {@code null}/empty then the
{@link #DEFAULT_CHANNEL_MODES} are used
+ * @return The open {@link FileChannel} - <B>Note:</B> do not
close this owner client instance
+ * until the channel is no longer needed since it uses
the client for providing the channel's
+ * functionality.
+ * @throws IOException If failed to open the channel
+ * @see
java.nio.channels.Channels#newInputStream(java.nio.channels.ReadableByteChannel)
+ * @see
java.nio.channels.Channels#newOutputStream(java.nio.channels.WritableByteChannel)
+ */
Review comment:
See previous remark regarding repeat of javadoc comment of an
overridden method
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpHelper;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.InputStreamWithChannel;
+
+public class SftpInputStreamAsync extends InputStreamWithChannel {
+
+ static class Ack {
Review comment:
Let's make this class public in accordance with our policy to allow
maximum flexibility to users who wish to modify/enhance the behavior of the
relevant class
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
##########
@@ -1278,4 +1274,95 @@ public void unlock(Handle handle, long offset, long
length) throws IOException {
buffer.putLong(length);
checkCommandStatus(SftpConstants.SSH_FXP_UNBLOCK, buffer);
}
+
+ /**
+ * @param path The remote directory path
+ * @return An {@link Iterable} that can be used to iterate
over all the directory entries (unlike
+ * {@link #readDir(Handle)})
+ * @throws IOException If failed to access the remote site
+ * @see #readDir(Handle)
+ */
+ @Override
+ public Iterable<DirEntry> readDir(String path) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("readDir(" + path + ") client is closed");
+ }
+
+ return new SftpIterableDirEntry(this, path);
+ }
+
+ /**
+ * @param handle A directory {@link Handle}
+ * @return An {@link Iterable} that can be used to iterate
over all the directory entries (like
+ * {@link #readDir(String)}). <B>Note:</B> the
iterable instance is not re-usable - i.e., files
+ * can be iterated only <U>once</U>
+ * @throws IOException If failed to access the directory
+ */
Review comment:
See previous remark regarding repeat of javadoc comment of an
overridden method
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
##########
@@ -501,4 +484,74 @@ public int negotiateVersion(SftpVersionSelector selector)
throws IOException {
versionHolder.set(selected);
return selected;
}
+
+ private class SftpChannelSubsystem extends ChannelSubsystem {
Review comment:
Let's make it protected (and the constructor) in accordance with our
policy to allow maximum flexibility to users who wish to modify/enhance the
behavior of the relevant class
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
##########
@@ -802,9 +801,6 @@ public int read(Handle handle, long fileOffset, byte[] dst,
int dstOffset, int l
protected int checkData(
int cmd, Buffer request, int dstOffset, byte[] dst,
AtomicReference<Boolean> eofSignalled)
throws IOException {
- if (eofSignalled != null) {
- eofSignalled.set(null);
- }
Review comment:
See previous remark regarding the removal of these lines
##########
File path:
sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpPerformanceTest.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+
+import eu.rekawek.toxiproxy.model.ToxicDirection;
+import eu.rekawek.toxiproxy.model.toxic.Latency;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.config.hosts.HostConfigEntryResolver;
+import org.apache.sshd.client.keyverifier.AcceptAllServerKeyVerifier;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.client.subsystem.sftp.fs.SftpFileSystem;
+import org.apache.sshd.common.keyprovider.KeyIdentityProvider;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.ToxiproxyContainer;
+import org.testcontainers.containers.ToxiproxyContainer.ContainerProxy;
+
+@Ignore
Review comment:
Recommend adding clarification text:
```java
@Ignore("Special class used for development only - not really a test just
useful to run as such")
```
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpHelper;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.InputStreamWithChannel;
+
+public class SftpInputStreamAsync extends InputStreamWithChannel {
+
+ static class Ack {
+ int id;
+ long offset;
+ int length;
+
+ Ack(int id, long offset, int length) {
+ this.id = id;
+ this.offset = offset;
+ this.length = length;
+ }
+ }
+
+ private final AbstractSftpClient client;
+ private final String path;
+ private final byte[] bb = new byte[1];
+ private final int bufferSize;
+ private final long fileSize;
+ private Buffer buffer;
+ private CloseableHandle handle;
+ private long requestOffset;
+ private long clientOffset;
+ private final Deque<Ack> pendingReads = new LinkedList<>();
+ private boolean eofIndicator;
+
+ public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
+ String path, Collection<OpenMode> mode) throws
IOException {
+ this.client = Objects.requireNonNull(client, "No SFTP client
instance");
+ this.path = path;
+ this.handle = client.open(path, mode);
+ this.bufferSize = bufferSize;
+ this.fileSize = client.stat(handle).getSize();
+ }
+
+ public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
long clientOffset, long fileSize,
+ String path, CloseableHandle handle) {
+ this.client = Objects.requireNonNull(client, "No SFTP client
instance");
+ this.path = path;
+ this.handle = handle;
+ this.bufferSize = bufferSize;
+ this.clientOffset = clientOffset;
+ this.fileSize = fileSize;
+ }
+
+ /**
+ * The client instance
+ *
+ * @return {@link SftpClient} instance used to access the remote file
+ */
+ public final AbstractSftpClient getClient() {
+ return client;
+ }
+
+ /**
+ * The remotely accessed file path
+ *
+ * @return Remote file path
+ */
+ public final String getPath() {
+ return path;
+ }
+
+ /**
+ * Check if the stream is at EOF
+ *
+ * @return <code>true</code> if all the data has been consumer
+ */
+ public boolean isEof() {
+ return eofIndicator && hasNoData();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return (handle != null) && handle.isOpen();
+ }
+
+ @Override
+ public int read() throws IOException {
+ int read = read(bb, 0, 1);
+ if (read > 0) {
+ return bb[0] & 0xFF;
+ }
+ return read;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (!isOpen()) {
+ throw new IOException("read(" + getPath() + ") stream closed");
+ }
+ int idx = off;
+ while (len > 0 && !eofIndicator) {
+ if (hasNoData()) {
+ fillData();
+ if (eofIndicator && (hasNoData())) {
+ break;
+ }
+ sendRequests();
+ } else {
+ int nb = Math.min(buffer.available(), len);
+ buffer.getRawBytes(b, off, nb);
+ idx += nb;
+ len -= nb;
+ clientOffset += nb;
+ }
+ }
+ int res = idx - off;
+ if (res == 0 && eofIndicator) {
+ res = -1;
+ }
+ return res;
+ }
+
+ public long transferTo(long max, WritableByteChannel out) throws
IOException {
+ if (!isOpen()) {
+ throw new IOException("transferTo(" + getPath() + ") stream
closed");
+ }
+ long orgOffset = clientOffset;
+ while (!eofIndicator && max > 0) {
+ if (hasNoData()) {
+ fillData();
+ if (eofIndicator && hasNoData()) {
+ break;
+ }
+ sendRequests();
+ } else {
+ int nb = buffer.available();
+ int toRead = (int) Math.min(nb, max);
+ ByteBuffer bb = ByteBuffer.wrap(buffer.array(), buffer.rpos(),
toRead);
+ while (bb.hasRemaining()) {
+ out.write(bb);
+ }
+ buffer.rpos(buffer.rpos() + toRead);
+ clientOffset += toRead;
+ max -= toRead;
+ }
+ }
+ return clientOffset - orgOffset;
+ }
+
+ @SuppressWarnings("PMD.MissingOverride")
Review comment:
If an `@Override` is in order here why omit it and suppress the warning ?
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpHelper;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.InputStreamWithChannel;
+
+public class SftpInputStreamAsync extends InputStreamWithChannel {
+
+ static class Ack {
+ int id;
+ long offset;
+ int length;
Review comment:
Should be public + final
##########
File path:
sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTransferTest.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.client.subsystem.sftp.fs.SftpFileSystem;
+import org.junit.Test;
+
+public class SftpTransferTest extends AbstractSftpClientTestSupport {
Review comment:
Missing standard annotation
`@FixMethodOrder(MethodSorters.NAME_ASCENDING)` that ensure reproducible test
failures
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
##########
@@ -524,13 +527,13 @@ protected void endBlocking(Object actionHint, boolean
completed)
* @param reqModes The required modes - ignored if {@code null}/empty
* @throws IOException If channel not open or the required modes are not
satisfied
*/
- private void ensureOpen(Collection<SftpClient.OpenMode> reqModes) throws
IOException {
+ private void ensureOpen(Collection<OpenMode> reqModes) throws IOException {
Review comment:
Let's make it protected in accordance with our policy to allow maximum
flexibility to users.
##########
File path:
sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Objects;
+
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.OutputStreamWithChannel;
+
+/**
+ * Implements an output stream for a given remote file
+ *
+ * @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a>
+ */
+public class SftpOutputStreamAsync extends OutputStreamWithChannel {
+
+ static class Ack {
Review comment:
Let's make it public in accordance with our policy to allow maximum
flexibility to users who wish to modify/enhance the behavior of the relevant
class
Isn't this exactly the same class as used by `SftpInputStreamAsync` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]