This is an automated email from the ASF dual-hosted git repository. lgoldstein pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit ba6ef50ed101a6c1baf8d854c94e4bbc6d2117b2 Author: Lyor Goldstein <[email protected]> AuthorDate: Mon Feb 4 15:25:05 2019 +0200 [SSHD-882] Added hooks to allow users to capture and handle extended (STDERR) data sent via the ChannelSession --- CHANGES.md | 5 + README.md | 14 +++ .../org/apache/sshd/common/util/io/IoUtils.java | 21 ++++ .../sshd/common/util/io/LineOutputStream.java | 122 ++++++++++++++++++++ .../sshd/common/util/io/LineOutputStreamTest.java | 123 +++++++++++++++++++++ .../sshd/server/channel/ChannelDataReceiver.java | 11 +- .../apache/sshd/server/channel/ChannelSession.java | 90 ++++++++++++--- .../sshd/server/subsystem/sftp/SftpSubsystem.java | 5 + 8 files changed, 367 insertions(+), 24 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index c1a02e4..e2221a6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -184,3 +184,8 @@ can be enabled/disabled via the `progress` command: ... set the progress marker indicator ... ``` + +# Since version 2.2.0 + +* [SSHD-882](https://issues.apache.org/jira/browse/SSHD-882) - Provide hooks to allow users to register a consumer +for STDERR data sent via the `ChannelSession` - especially for the SFTP subsystem. diff --git a/README.md b/README.md index c5237a3..e193475 100644 --- a/README.md +++ b/README.md @@ -1267,6 +1267,20 @@ using a registered `SftpErrorStatusDataHandler`. The default implementation prov exception type. However, users may override it when creating the `SftpSubsystemFactory` and provide their own codes and/or messages - e.g., for debugging one can register a `DetailedSftpErrorStatusDataHandler` (see `sshd-contrib`) that "leaks" more information in the generated message. +If the registered handler implements `ChannelSessionAware` then it will also be informed of the registered `ChannelSession` when it is +provided to the `SftpSubsystem` itself. This can be used to register an extended data writer that can handle data sent via the STDERR +channel. **Note:** this feature is allowed according to [SFTP version 4 - section 3.1](https://tools.ietf.org/html/draft-ietf-secsh-filexfer-04#section-3.1): + +>> Packets are sent and received on stdout and stdin. Data sent on stderr by the server SHOULD be considered debug +>> or supplemental error information, and MAY be displayed to the user. + +however, the current code provides no built-in support for this feature. + +If registering an extended data writer then one should take care of any race conditions that may occur where (extended) data +may arrive before the handler is informed of the existence of the `ChannelSession`. For this purpose one should configure a +reasonable buffer size by setting the `channel-session-max-extdata-bufsize` property. This way, if any data arrives before the +extended data handler is registered it will be buffered (up to the specified max. size). **Note:** if a buffer size is configured +but no extended data handler is registered when channel is spawning the command then an exception will occur. ## Port forwarding diff --git a/sshd-common/src/main/java/org/apache/sshd/common/util/io/IoUtils.java b/sshd-common/src/main/java/org/apache/sshd/common/util/io/IoUtils.java index 40ef9b0..910a6a3 100644 --- a/sshd-common/src/main/java/org/apache/sshd/common/util/io/IoUtils.java +++ b/sshd-common/src/main/java/org/apache/sshd/common/util/io/IoUtils.java @@ -149,6 +149,27 @@ public final class IoUtils { } /** + * Closes the specified {@link Closeable} resource + * + * @param c The resource to close - ignored if {@code null} + * @return The thrown {@link IOException} when {@code close()} was + * called - {@code null} if no exception was thrown (or no resource + * to close to begin with) + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public static IOException closeQuietly(Closeable c) { + if (c != null) { + try { + c.close(); + } catch (IOException e) { + return e; + } + } + + return null; + } + + /** * Closes a bunch of resources suppressing any {@link IOException}s their * {@link Closeable#close()} method may have thrown * diff --git a/sshd-contrib/src/main/java/org/apache/sshd/common/util/io/LineOutputStream.java b/sshd-contrib/src/main/java/org/apache/sshd/common/util/io/LineOutputStream.java new file mode 100644 index 0000000..b2e45d9 --- /dev/null +++ b/sshd-contrib/src/main/java/org/apache/sshd/common/util/io/LineOutputStream.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.common.util.io; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Calls the actual writing method only when LF detected in the written stream. + * <B>Note:</B> it strips CR if found before the LF + * + * @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a> + */ +public abstract class LineOutputStream extends OutputStream { + protected final byte[] oneByte = new byte[1]; + protected byte[] lineBuf; + protected int usedLen; + + protected LineOutputStream() { + super(); + } + + @Override + public void write(int b) throws IOException { + oneByte[0] = (byte) (b & 0xff); + write(oneByte, 0, 1); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + int lastOffset = off; + int maxOffset = off + len; + for (int curOffset = off; curOffset < maxOffset; curOffset++) { + byte ch = b[curOffset]; + if (ch != 0x0a) { + continue; + } + + // Any previous line segment ? + if (usedLen > 0) { + accumulateLineData(b, lastOffset, curOffset - lastOffset); + + // Strip CR + if (lineBuf[usedLen - 1] == 0x0d) { + usedLen--; + } + handleLine(lineBuf, 0, usedLen); + usedLen = 0; + } else { + int lineLen = curOffset - lastOffset; + // Strip CR + if ((lineLen > 0) && (b[curOffset - 1] == 0x0d)) { + lineLen--; + } + handleLine(b, lastOffset, lineLen); + } + + lastOffset = curOffset + 1; + } + + // any leftovers ? + if (lastOffset < maxOffset) { + accumulateLineData(b, lastOffset, maxOffset - lastOffset); + } + } + + protected void accumulateLineData(byte[] b, int off, int len) throws IOException { + if (len <= 0) { + return; + } + + int reqLen = usedLen + len; + if ((lineBuf == null) || (reqLen >= lineBuf.length)) { + byte[] tmp = new byte[reqLen + Byte.SIZE /* a bit extra to avoid frequent re-sizing */]; + if (usedLen > 0) { + System.arraycopy(lineBuf, 0, tmp, 0, usedLen); + } + lineBuf = tmp; + } + + System.arraycopy(b, off, lineBuf, usedLen, len); + usedLen += len; + } + + protected abstract void handleLine(byte[] buf, int offset, int len) throws IOException; + + @Override + public void close() throws IOException { + // Last line might not be LF terminated + if (usedLen > 0) { + // Strip CR + if (lineBuf[usedLen - 1] == 0x0d) { + usedLen--; + } + + handleLine(lineBuf, 0, usedLen); + usedLen = 0; + } + } +} diff --git a/sshd-contrib/src/test/java/org/apache/sshd/common/util/io/LineOutputStreamTest.java b/sshd-contrib/src/test/java/org/apache/sshd/common/util/io/LineOutputStreamTest.java new file mode 100644 index 0000000..c49f608 --- /dev/null +++ b/sshd-contrib/src/test/java/org/apache/sshd/common/util/io/LineOutputStreamTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sshd.common.util.io; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.StreamCorruptedException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.sshd.util.test.JUnit4ClassRunnerWithParametersFactory; +import org.apache.sshd.util.test.JUnitTestSupport; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.MethodSorters; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runners.Parameterized.UseParametersRunnerFactory; + +/** + * @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a> + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +@RunWith(Parameterized.class) // see https://github.com/junit-team/junit/wiki/Parameterized-tests +@UseParametersRunnerFactory(JUnit4ClassRunnerWithParametersFactory.class) +public class LineOutputStreamTest extends JUnitTestSupport { + private final boolean withCR; + + public LineOutputStreamTest(boolean withCR) { + this.withCR = withCR; + } + + @Parameters(name = "CR={0}") + public static List<Object[]> parameters() { + return Arrays.asList(new Object[] {Boolean.TRUE}, new Object[] {Boolean.FALSE}); + } + + @Test + public void testLineParsing() throws IOException { + List<String> expected = new ArrayList<>(); + String prefix = getClass().getName() + "#" + getCurrentTestName() + "-"; + for (int index = 1; index < Byte.MAX_VALUE; index++) { + expected.add(prefix + index); + } + + Path targetFile = getTargetRelativeFile( + getClass().getSimpleName(), getCurrentTestName() + "-" + (withCR ? "CR" : "LF") + ".txt"); + Files.createDirectories(targetFile.getParent()); + try (OutputStream fout = Files.newOutputStream(targetFile)) { + int lineCount = 0; + for (String l : expected) { + byte[] b = l.getBytes(StandardCharsets.UTF_8); + fout.write(b); + if (withCR) { + fout.write(0x0d); + } + fout.write(0x0a); + + lineCount++; + if ((lineCount & 0x03) == 0) { + if (withCR) { + fout.write(0x0d); + } + fout.write(0x0a); + } + } + } + + List<String> actual = new ArrayList<>(expected.size()); + try (InputStream fin = Files.newInputStream(targetFile); + OutputStream lout = new LineOutputStream() { + private int lineCount; + + @Override + protected void handleLine(byte[] buf, int offset, int len) throws IOException { + lineCount++; + if (len == 0) { + return; // ignore empty lines + } + + byte lastChar = buf[offset + len - 1]; + if ((lastChar == 0x0a) || (lastChar == 0x0d)) { + throw new StreamCorruptedException("Invalid line ending at line #" + lineCount); + } + + String l = new String(buf, offset, len, StandardCharsets.UTF_8); + actual.add(l); + } + }) { + IoUtils.copy(fin, lout); + } + + assertListEquals(getCurrentTestName(), expected, actual); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "[withCR=" + withCR + "]"; + } +} diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelDataReceiver.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelDataReceiver.java index 59ade13..f1cb21c 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelDataReceiver.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelDataReceiver.java @@ -43,7 +43,8 @@ import java.io.IOException; public interface ChannelDataReceiver extends Closeable { /** * <p> - * Called when the server receives additional bytes from the client. + * Called when the server receives additional bytes from the client. When {@link #close()}-d + * then indicates EOF - The client will no longer send us any more data. * </p> * * <p> @@ -104,12 +105,4 @@ public interface ChannelDataReceiver extends Closeable { * @throws IOException if failed to consume the data */ int data(ChannelSession channel, byte[] buf, int start, int len) throws IOException; - - /** - * Called to indicate EOF. The client will no longer send us any more data. - * - * @throws IOException if failed - */ - @Override - void close() throws IOException; } diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java index e54e0e4..ab57cb1 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java @@ -83,7 +83,14 @@ import org.apache.sshd.server.x11.X11ForwardSupport; */ public class ChannelSession extends AbstractServerChannel { public static final List<ChannelRequestHandler> DEFAULT_HANDLERS = - Collections.<ChannelRequestHandler>singletonList(PuttyRequestHandler.INSTANCE); + Collections.<ChannelRequestHandler>singletonList(PuttyRequestHandler.INSTANCE); + + /** + * Maximum amount of extended (a.k.a. STDERR) data allowed to be accumulated + * until a {@link ChannelDataReceiver} for the data is registered + */ + public static final String MAX_EXTDATA_BUFSIZE = "channel-session-max-extdata-bufsize"; + public static final int DEFAULT_MAX_EXTDATA_BUFSIZE = 0; protected String type; protected ChannelAsyncOutputStream asyncOut; @@ -92,7 +99,9 @@ public class ChannelSession extends AbstractServerChannel { protected OutputStream err; protected Command commandInstance; protected ChannelDataReceiver receiver; - protected Buffer tempBuffer; + protected ChannelDataReceiver extendedDataWriter; + protected Buffer receiverBuffer; + protected Buffer extendedDataBuffer; protected final AtomicBoolean commandStarted = new AtomicBoolean(false); protected final StandardEnvironment env = new StandardEnvironment(); protected final CloseFuture commandExitFuture; @@ -159,11 +168,11 @@ public class ChannelSession extends AbstractServerChannel { if (immediately || (commandInstance == null)) { commandExitFuture.setClosed(); } else if (!commandExitFuture.isClosed()) { - IOException e = IoUtils.closeQuietly(receiver); + IOException e = IoUtils.closeQuietly(receiver, extendedDataWriter); boolean debugEnabled = log.isDebugEnabled(); if (e != null) { if (debugEnabled) { - log.debug("close({})[immediately={}] failed ({}) to close receiver: {}", + log.debug("close({})[immediately={}] failed ({}) to close receiver(s): {}", this, immediately, e.getClass().getSimpleName(), e.getMessage()); } } @@ -208,7 +217,7 @@ public class ChannelSession extends AbstractServerChannel { } } - IOException e = IoUtils.closeQuietly(getRemoteWindow(), out, err, receiver); + IOException e = IoUtils.closeQuietly(getRemoteWindow(), out, err, receiver, extendedDataWriter); if (e != null) { if (debugEnabled) { log.debug("doCloseImmediately({}) failed ({}) to close resources: {}", @@ -230,10 +239,10 @@ public class ChannelSession extends AbstractServerChannel { public void handleEof() throws IOException { super.handleEof(); - IOException e = IoUtils.closeQuietly(receiver); + IOException e = IoUtils.closeQuietly(receiver, extendedDataWriter); if (e != null) { if (log.isDebugEnabled()) { - log.debug("handleEof({}) failed ({}) to close receiver: {}", + log.debug("handleEof({}) failed ({}) to close receiver(s): {}", this, e.getClass().getSimpleName(), e.getMessage()); } @@ -251,24 +260,47 @@ public class ChannelSession extends AbstractServerChannel { } ValidateUtils.checkTrue(len <= Integer.MAX_VALUE, "Data length exceeds int boundaries: %d", len); + int reqLen = (int) len; if (receiver != null) { - int r = receiver.data(this, data, off, (int) len); + int r = receiver.data(this, data, off, reqLen); if (r > 0) { Window wLocal = getLocalWindow(); wLocal.consumeAndCheck(r); } } else { ValidateUtils.checkTrue(len <= (Integer.MAX_VALUE - Long.SIZE), "Temporary data length exceeds int boundaries: %d", len); - if (tempBuffer == null) { - tempBuffer = new ByteArrayBuffer((int) len + Long.SIZE, false); + if (receiverBuffer == null) { + receiverBuffer = new ByteArrayBuffer(reqLen + Long.SIZE, false); } - tempBuffer.putRawBytes(data, off, (int) len); + receiverBuffer.putRawBytes(data, off, reqLen); } } @Override protected void doWriteExtendedData(byte[] data, int off, long len) throws IOException { - throw new UnsupportedOperationException("Server channel does not support extended data"); + ValidateUtils.checkTrue(len <= (Integer.MAX_VALUE - Long.SIZE), "Extended data length exceeds int boundaries: %d", len); + + if (extendedDataWriter != null) { + extendedDataWriter.data(this, data, off, (int) len); + return; + } + + int reqSize = (int) len; + int maxBufSize = PropertyResolverUtils.getIntProperty(this, MAX_EXTDATA_BUFSIZE, DEFAULT_MAX_EXTDATA_BUFSIZE); + int curBufSize = (extendedDataBuffer == null) ? 0 : extendedDataBuffer.available(); + int totalSize = curBufSize + reqSize; + if (totalSize > maxBufSize) { + if ((curBufSize <= 0) && (maxBufSize <= 0)) { + throw new UnsupportedOperationException("Session channel does not support extended data"); + } + + throw new IndexOutOfBoundsException("Extended data buffer size (" + maxBufSize + ") exceeded"); + } + + if (extendedDataBuffer == null) { + extendedDataBuffer = new ByteArrayBuffer(totalSize + Long.SIZE, false); + } + extendedDataBuffer.putRawBytes(data, off, reqSize); } @Override @@ -616,6 +648,21 @@ public class ChannelSession extends AbstractServerChannel { } /** + * A special {@link ChannelDataReceiver} that can be used to receive + * data sent as "extended" - usually STDERR. <B>Note:</B> by + * default any such data sent to the channel session causes an exception, + * but specific implementations may choose to register such a receiver + * (e.g., for custom usage of the STDERR stream). A good place in the + * code to register such a writer would be in commands that also + * implement {@code ChannelSessionAware}. + * + * @param extendedDataWriter The {@link ChannelDataReceiver}. + */ + public void setExtendedDataWriter(ChannelDataReceiver extendedDataWriter) { + this.extendedDataWriter = extendedDataWriter; + } + + /** * Called by {@link #prepareChannelCommand(String, Command)} in order to set * up the command's streams, session, file-system, exit callback, etc.. * @@ -666,6 +713,7 @@ public class ChannelSession extends AbstractServerChannel { command.setOutputStream(out); command.setErrorStream(err); } + if (this.receiver == null) { // if the command hasn't installed any ChannelDataReceiver, install the default // and give the command an InputStream @@ -679,11 +727,23 @@ public class ChannelSession extends AbstractServerChannel { command.setInputStream(recv.getIn()); } } - if (tempBuffer != null) { - Buffer buffer = tempBuffer; - tempBuffer = null; + + if (receiverBuffer != null) { + Buffer buffer = receiverBuffer; + receiverBuffer = null; doWriteData(buffer.array(), buffer.rpos(), buffer.available()); } + + if (extendedDataBuffer != null) { + if (extendedDataWriter == null) { + throw new UnsupportedOperationException("No extended data writer available though " + extendedDataBuffer.available() + " bytes accumulated"); + } + + Buffer buffer = extendedDataBuffer; + extendedDataBuffer = null; + doWriteExtendedData(buffer.array(), buffer.rpos(), buffer.available()); + } + command.setExitCallback((exitValue, exitMessage) -> { try { closeShell(exitValue); diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java index e7f1538..802b368 100644 --- a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java +++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java @@ -219,6 +219,11 @@ public class SftpSubsystem public void setChannelSession(ChannelSession session) { this.channelSession = session; session.setDataReceiver(this); + + SftpErrorStatusDataHandler errHandler = getErrorStatusDataHandler(); + if (errHandler instanceof ChannelSessionAware) { + ((ChannelSessionAware) errHandler).setChannelSession(session); + } } @Override
