This is an automated email from the ASF dual-hosted git repository.
lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push:
new 66feece [SSHD-1022] Added logging to SftpInputStreamAsync
66feece is described below
commit 66feecec850beb532b4de75771ae2413c2cc5070
Author: Lyor Goldstein <[email protected]>
AuthorDate: Thu Jun 25 22:04:38 2020 +0300
[SSHD-1022] Added logging to SftpInputStreamAsync
---
.../subsystem/sftp/impl/SftpInputStreamAsync.java | 189 +++++++++++++++------
1 file changed, 137 insertions(+), 52 deletions(-)
diff --git
a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
index b012220..5eddbe9 100644
---
a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
+++
b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
@@ -40,8 +40,11 @@ import org.apache.sshd.common.subsystem.sftp.SftpHelper;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.io.InputStreamWithChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SftpInputStreamAsync extends InputStreamWithChannel {
+ protected final Logger log;
protected final byte[] bb = new byte[1];
protected final int bufferSize;
protected final long fileSize;
@@ -52,12 +55,13 @@ public class SftpInputStreamAsync extends
InputStreamWithChannel {
protected final Deque<SftpAckData> pendingReads = new LinkedList<>();
protected boolean eofIndicator;
- private final AbstractSftpClient client;
+ private final AbstractSftpClient clientInstance;
private final String path;
public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
String path, Collection<OpenMode> mode) throws
IOException {
- this.client = Objects.requireNonNull(client, "No SFTP client
instance");
+ this.log = LoggerFactory.getLogger(getClass());
+ this.clientInstance = Objects.requireNonNull(client, "No SFTP client
instance");
this.path = path;
this.handle = client.open(path, mode);
this.bufferSize = bufferSize;
@@ -66,7 +70,8 @@ public class SftpInputStreamAsync extends
InputStreamWithChannel {
public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
long clientOffset, long fileSize,
String path, CloseableHandle handle) {
- this.client = Objects.requireNonNull(client, "No SFTP client
instance");
+ this.log = LoggerFactory.getLogger(getClass());
+ this.clientInstance = Objects.requireNonNull(client, "No SFTP client
instance");
this.path = path;
this.handle = handle;
this.bufferSize = bufferSize;
@@ -80,7 +85,7 @@ public class SftpInputStreamAsync extends
InputStreamWithChannel {
* @return {@link SftpClient} instance used to access the remote file
*/
public final AbstractSftpClient getClient() {
- return client;
+ return clientInstance;
}
/**
@@ -120,8 +125,9 @@ public class SftpInputStreamAsync extends
InputStreamWithChannel {
if (!isOpen()) {
throw new IOException("read(" + getPath() + ") stream closed");
}
+
int idx = off;
- while (len > 0 && !eofIndicator) {
+ while ((len > 0) && (!eofIndicator)) {
if (hasNoData()) {
fillData();
if (eofIndicator && (hasNoData())) {
@@ -136,8 +142,9 @@ public class SftpInputStreamAsync extends
InputStreamWithChannel {
clientOffset += nb;
}
}
+
int res = off - idx;
- if (res == 0 && eofIndicator) {
+ if ((res == 0) && eofIndicator) {
res = -1;
}
return res;
@@ -149,6 +156,7 @@ public class SftpInputStreamAsync extends
InputStreamWithChannel {
}
long orgOffset = clientOffset;
+ long totalRequested = max;
while ((!eofIndicator) && (max > 0L)) {
if (hasNoData()) {
fillData();
@@ -168,7 +176,12 @@ public class SftpInputStreamAsync extends
InputStreamWithChannel {
max -= toRead;
}
}
- return clientOffset - orgOffset;
+
+ long numXfered = clientOffset - orgOffset;
+ if (log.isDebugEnabled()) {
+ log.debug("transferTo({}) transferred {}/{} bytes", numXfered,
totalRequested);
+ }
+ return numXfered;
}
@SuppressWarnings("PMD.MissingOverride")
@@ -192,7 +205,12 @@ public class SftpInputStreamAsync extends
InputStreamWithChannel {
clientOffset += nb;
}
}
- return clientOffset - orgOffset;
+
+ long numXfered = clientOffset - orgOffset;
+ if (log.isDebugEnabled()) {
+ log.debug("transferTo({}) transferred {} bytes", this, numXfered);
+ }
+ return numXfered;
}
@Override
@@ -200,10 +218,15 @@ public class SftpInputStreamAsync extends
InputStreamWithChannel {
if (!isOpen()) {
throw new IOException("skip(" + getPath() + ") stream closed");
}
+
if ((clientOffset == 0L) && pendingReads.isEmpty()) {
+ if (log.isDebugEnabled()) {
+ log.debug("skip({}) virtual skip of {} bytes", this, n);
+ }
clientOffset = n;
return n;
}
+
return super.skip(n);
}
@@ -212,64 +235,109 @@ public class SftpInputStreamAsync extends
InputStreamWithChannel {
}
protected void sendRequests() throws IOException {
- if (!eofIndicator) {
- Channel channel = client.getChannel();
- Window localWindow = channel.getLocalWindow();
- long windowSize = localWindow.getMaxSize();
- Session session = client.getSession();
- byte[] id = handle.getIdentifier();
-
- while ((pendingReads.size() < (int) (windowSize / bufferSize)) &&
(requestOffset < (fileSize + bufferSize))
- || pendingReads.isEmpty()) {
- Buffer buf =
session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA,
- 23 /* sftp packet */ + 16 + id.length);
- buf.rpos(23);
- buf.wpos(23);
- buf.putBytes(id);
- buf.putLong(requestOffset);
- buf.putInt(bufferSize);
- int reqId = client.send(SftpConstants.SSH_FXP_READ, buf);
- pendingReads.add(new SftpAckData(reqId, requestOffset,
bufferSize));
- requestOffset += bufferSize;
+ if (eofIndicator) {
+ if (log.isDebugEnabled()) {
+ log.debug("sendRequests({}) EOF indicator ON", this);
}
+ return;
+ }
+
+ AbstractSftpClient client = getClient();
+ Channel channel = client.getChannel();
+ Window localWindow = channel.getLocalWindow();
+ long windowSize = localWindow.getMaxSize();
+ Session session = client.getSession();
+ byte[] id = handle.getIdentifier();
+ boolean traceEnabled = log.isTraceEnabled();
+ for (int ackIndex = 1;
+ (pendingReads.size() < (int) (windowSize / bufferSize)) &&
(requestOffset < (fileSize + bufferSize))
+ || pendingReads.isEmpty();
+ ackIndex++) {
+ Buffer buf =
session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA,
+ 23 /* sftp packet */ + 16 + id.length);
+ buf.rpos(23);
+ buf.wpos(23);
+ buf.putBytes(id);
+ buf.putLong(requestOffset);
+ buf.putInt(bufferSize);
+ int reqId = client.send(SftpConstants.SSH_FXP_READ, buf);
+ SftpAckData ack = new SftpAckData(reqId, requestOffset,
bufferSize);
+ if (traceEnabled) {
+ log.trace("sendRequests({}) enqueue pending ack #{}: {}",
this, ackIndex, ack);
+ }
+ pendingReads.add(ack);
+ requestOffset += bufferSize;
}
}
protected void fillData() throws IOException {
SftpAckData ack = pendingReads.pollFirst();
- if (ack != null) {
- pollBuffer(ack);
- if ((!eofIndicator) && (clientOffset < ack.offset)) {
- // we are actually missing some data
- // so request is synchronously
- byte[] data = new byte[(int) (ack.offset - clientOffset +
buffer.available())];
- int cur = 0;
- int nb = (int) (ack.offset - clientOffset);
- AtomicReference<Boolean> eof = new AtomicReference<>();
- while (cur < nb) {
- int dlen = client.read(handle, clientOffset, data, cur, nb
- cur, eof);
- Boolean eofSignal = eof.getAndSet(null);
- eofIndicator = (dlen < 0) || ((eofSignal != null) &&
eofSignal.booleanValue());
- cur += dlen;
+ boolean traceEnabled = log.isTraceEnabled();
+ if (ack == null) {
+ if (traceEnabled) {
+ log.trace("fillData({}) no pending ack", this);
+ }
+ return;
+ }
+
+ if (traceEnabled) {
+ log.trace("fillData({}) process ack={}", this, ack);
+ }
+ pollBuffer(ack);
+
+ if ((!eofIndicator) && (clientOffset < ack.offset)) {
+ // we are actually missing some data
+ // so request is synchronously
+ byte[] data = new byte[(int) (ack.offset - clientOffset +
buffer.available())];
+ int nb = (int) (ack.offset - clientOffset);
+ if (traceEnabled) {
+ log.trace("fillData({}) reading {} bytes", this, nb);
+ }
+
+ AtomicReference<Boolean> eof = new AtomicReference<>();
+ SftpClient client = getClient();
+ for (int cur = 0; cur < nb;) {
+ int dlen = client.read(handle, clientOffset, data, cur, nb -
cur, eof);
+ Boolean eofSignal = eof.getAndSet(null);
+ if ((dlen < 0) || ((eofSignal != null) &&
eofSignal.booleanValue())) {
+ eofIndicator = true;
}
- buffer.getRawBytes(data, nb, buffer.available());
- buffer = new ByteArrayBuffer(data);
+ cur += dlen;
+ }
+
+ if (traceEnabled) {
+ log.trace("fillData({}) read {} bytes - EOF={}", this, nb,
eofIndicator);
}
+
+ buffer.getRawBytes(data, nb, buffer.available());
+ buffer = new ByteArrayBuffer(data);
}
}
protected void pollBuffer(SftpAckData ack) throws IOException {
+ boolean traceEnabled = log.isTraceEnabled();
+ if (traceEnabled) {
+ log.trace("pollBuffer({}) polling ack={}", this, ack);
+ }
+
+ AbstractSftpClient client = getClient();
Buffer buf = client.receive(ack.id);
int length = buf.getInt();
int type = buf.getUByte();
int id = buf.getInt();
+ if (traceEnabled) {
+ log.trace("pollBuffer({}) response={} for ack={} - len={}", this,
type, ack, length);
+ }
client.validateIncomingResponse(SshConstants.SSH_MSG_CHANNEL_DATA, id,
type, length, buf);
+
if (type == SftpConstants.SSH_FXP_DATA) {
int dlen = buf.getInt();
int rpos = buf.rpos();
buf.rpos(rpos + dlen);
Boolean b = SftpHelper.getEndOfFileIndicatorValue(buf,
client.getVersion());
- eofIndicator = (b != null) && b.booleanValue();
+ if ((b != null) && b.booleanValue()) {
+ eofIndicator = true;
+ }
buf.rpos(rpos);
buf.wpos(rpos + dlen);
this.buffer = buf;
@@ -293,19 +361,36 @@ public class SftpInputStreamAsync extends
InputStreamWithChannel {
@Override
public void close() throws IOException {
- if (isOpen()) {
+ if (!isOpen()) {
+ return;
+ }
+
+ try {
+ boolean debugEnabled = log.isDebugEnabled();
try {
- try {
- while (!pendingReads.isEmpty()) {
- SftpAckData ack = pendingReads.removeFirst();
- pollBuffer(ack);
+ for (int ackIndex = 1; !pendingReads.isEmpty(); ackIndex++) {
+ SftpAckData ack = pendingReads.removeFirst();
+ if (debugEnabled) {
+ log.debug("close({}) process ack #{}: {}", this,
ackIndex, ack);
}
- } finally {
- handle.close();
+ pollBuffer(ack);
}
} finally {
- handle = null;
+ if (debugEnabled) {
+ log.debug("close({}) closing file handle", this);
+ }
+ handle.close();
}
+ } finally {
+ handle = null;
}
}
+
+ @Override
+ public String toString() {
+ SftpClient client = getClient();
+ return getClass().getSimpleName()
+ + "[" + client.getSession() + "]"
+ + "[" + getPath() + "]";
+ }
}