[SSHD-839] Added more detailed log messages in PortForwardingLoadTest
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/aa7547b7 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/aa7547b7 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/aa7547b7 Branch: refs/heads/master Commit: aa7547b7b58e49c59302407e9c8f765fa22de695 Parents: 6ddbe1a Author: Goldstein Lyor <[email protected]> Authored: Wed Aug 22 14:09:38 2018 +0300 Committer: Goldstein Lyor <[email protected]> Committed: Sun Aug 26 08:50:42 2018 +0300 ---------------------------------------------------------------------- .../common/forward/PortForwardingLoadTest.java | 124 ++++++++++++++----- .../sshd/common/forward/PortForwardingTest.java | 57 +++++---- 2 files changed, 123 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa7547b7/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java index dd076ae..dc90b6d 100644 --- a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingLoadTest.java @@ -77,42 +77,48 @@ public class PortForwardingLoadTest extends BaseTestSupport { @SuppressWarnings({ "checkstyle:anoninnerlength", "synthetic-access" }) private final PortForwardingEventListener serverSideListener = new PortForwardingEventListener() { @Override - public void establishingExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local, - SshdSocketAddress remote, boolean localForwarding) throws IOException { + public void establishingExplicitTunnel( + org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding) + throws IOException { log.info("establishingExplicitTunnel(session={}, local={}, remote={}, localForwarding={})", - session, local, remote, localForwarding); + session, local, remote, localForwarding); } @Override - public void establishedExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local, + public void establishedExplicitTunnel( + org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding, SshdSocketAddress boundAddress, Throwable reason) - throws IOException { + throws IOException { log.info("establishedExplicitTunnel(session={}, local={}, remote={}, bound={}, localForwarding={}): {}", - session, local, remote, boundAddress, localForwarding, reason); + session, local, remote, boundAddress, localForwarding, reason); } @Override - public void tearingDownExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address, - boolean localForwarding) throws IOException { + public void tearingDownExplicitTunnel( + org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding) + throws IOException { log.info("tearingDownExplicitTunnel(session={}, address={}, localForwarding={})", session, address, localForwarding); } @Override - public void tornDownExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address, - boolean localForwarding, Throwable reason) throws IOException { + public void tornDownExplicitTunnel( + org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding, Throwable reason) + throws IOException { log.info("tornDownExplicitTunnel(session={}, address={}, localForwarding={}, reason={})", - session, address, localForwarding, reason); + session, address, localForwarding, reason); } @Override - public void establishingDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local) - throws IOException { + public void establishingDynamicTunnel( + org.apache.sshd.common.session.Session session, SshdSocketAddress local) + throws IOException { log.info("establishingDynamicTunnel(session={}, local={})", session, local); } @Override - public void establishedDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local, - SshdSocketAddress boundAddress, Throwable reason) throws IOException { + public void establishedDynamicTunnel( + org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason) + throws IOException { log.info("establishedDynamicTunnel(session={}, local={}, bound={}, reason={})", session, local, boundAddress, reason); } @@ -123,8 +129,9 @@ public class PortForwardingLoadTest extends BaseTestSupport { } @Override - public void tornDownDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address, - Throwable reason) throws IOException { + public void tornDownDynamicTunnel( + org.apache.sshd.common.session.Session session, SshdSocketAddress address, Throwable reason) + throws IOException { log.info("tornDownDynamicTunnel(session={}, address={}, reason={})", session, address, reason); } }; @@ -191,7 +198,11 @@ public class PortForwardingLoadTest extends BaseTestSupport { for (int i = 0; i < 1000; i++) { sb.append(payloadTmpData); } - final String payload = sb.toString(); + String payload = sb.toString(); + + final byte[] dataBytes = payload.getBytes(StandardCharsets.UTF_8); + final int reportPhase = dataBytes.length / 10; + log.info("{} using payload size={}", getCurrentTestName(), dataBytes.length); Session session = createSession(); try (ServerSocket ss = new ServerSocket()) { @@ -199,8 +210,11 @@ public class PortForwardingLoadTest extends BaseTestSupport { ss.bind(new InetSocketAddress((InetAddress) null, 0)); int forwardedPort = ss.getLocalPort(); int sinkPort = session.setPortForwardingL(0, TEST_LOCALHOST, forwardedPort); - final AtomicInteger conCount = new AtomicInteger(0); - final Semaphore iterationsSignal = new Semaphore(0); + log.info("{} forwardedPort={}, sinkPort={}", getCurrentTestName(), forwardedPort, sinkPort); + + AtomicInteger conCount = new AtomicInteger(0); + Semaphore iterationsSignal = new Semaphore(0); + @SuppressWarnings("checkstyle:anoninnerlength") Thread tAcceptor = new Thread(getCurrentTestName() + "Acceptor") { @SuppressWarnings("synthetic-access") @Override @@ -210,30 +224,44 @@ public class PortForwardingLoadTest extends BaseTestSupport { log.info("Started..."); for (int i = 0; i < numIterations; ++i) { try (Socket s = ss.accept()) { - conCount.incrementAndGet(); + int totalConns = conCount.incrementAndGet(); + log.info("Accepted connection #{} from {}", totalConns, s.getRemoteSocketAddress()); try (InputStream sockIn = s.getInputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - while (baos.size() < payload.length()) { + for (int readSize = 0, lastReport = 0; readSize < dataBytes.length;) { int l = sockIn.read(buf); if (l < 0) { break; } + baos.write(buf, 0, l); + readSize += l; + + if ((readSize - lastReport) >= reportPhase) { + log.info("Read {}/{} bytes of iteration #{}", readSize, dataBytes.length, i); + lastReport = readSize; + } } - assertEquals("Mismatched received data at iteration #" + i, payload, baos.toString()); + assertPayloadEquals("Mismatched received data at iteration #" + i, dataBytes, baos.toByteArray()); - try (InputStream inputCopy = new ByteArrayInputStream(baos.toByteArray()); + byte[] outBytes = baos.toByteArray(); + try (InputStream inputCopy = new ByteArrayInputStream(outBytes); OutputStream sockOut = s.getOutputStream()) { - while (true) { + for (int writeSize = 0, lastReport = 0; writeSize < outBytes.length;) { int l = sockIn.read(buf); if (l < 0) { break; } sockOut.write(buf, 0, l); + writeSize += l; + if ((writeSize - lastReport) >= reportPhase) { + log.info("Written {}/{} bytes of iteration #{}", writeSize, dataBytes.length, i); + lastReport = writeSize; + } } } } @@ -248,30 +276,38 @@ public class PortForwardingLoadTest extends BaseTestSupport { } }; tAcceptor.start(); - Thread.sleep(TimeUnit.SECONDS.toMillis(1L)); + Thread.sleep(TimeUnit.SECONDS.toMillis(3L)); byte[] buf = new byte[8192]; - byte[] bytes = payload.getBytes(StandardCharsets.UTF_8); for (int i = 0; i < numIterations; i++) { - log.info("Iteration {}", i); + log.info("Iteration {} started", i); try (Socket s = new Socket(TEST_LOCALHOST, sinkPort); OutputStream sockOut = s.getOutputStream()) { + log.info("Iteration {} connected to {}", i, s.getRemoteSocketAddress()); s.setSoTimeout((int) FactoryManager.DEFAULT_NIO2_MIN_WRITE_TIMEOUT); - sockOut.write(bytes); + sockOut.write(dataBytes); sockOut.flush(); + log.info("Iteration {} awaiting echoed data", i); try (InputStream sockIn = s.getInputStream(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(bytes.length)) { - while (baos.size() < payload.length()) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(dataBytes.length)) { + for (int readSize = 0, lastReport = 0; readSize < dataBytes.length;) { int l = sockIn.read(buf); if (l < 0) { break; } + baos.write(buf, 0, l); + readSize += l; + + if ((readSize - lastReport) >= reportPhase) { + log.info("Read {}/{} bytes of iteration #{}", readSize, dataBytes.length, i); + lastReport = readSize; + } } - assertEquals("Mismatched payload at iteration #" + i, payload, baos.toString()); + assertPayloadEquals("Mismatched payload at iteration #" + i, dataBytes, baos.toByteArray()); } } catch (Exception e) { log.error("Error in iteration #" + i, e); @@ -280,18 +316,40 @@ public class PortForwardingLoadTest extends BaseTestSupport { try { assertTrue("Failed to await pending iterations=" + numIterations, - iterationsSignal.tryAcquire(numIterations, numIterations, TimeUnit.SECONDS)); + iterationsSignal.tryAcquire(numIterations, numIterations, TimeUnit.SECONDS)); } finally { + log.info("{} remove port forwarding for {}", getCurrentTestName(), sinkPort); session.delPortForwardingL(sinkPort); } ss.close(); + log.info("{} awaiting acceptor finish", getCurrentTestName()); tAcceptor.join(TimeUnit.SECONDS.toMillis(11L)); } finally { session.disconnect(); } } + private static void assertPayloadEquals(String message, byte[] expectedBytes, byte[] actualBytes) { + assertEquals(message + ": mismatched payload length", expectedBytes.length, actualBytes.length); + for (int index = 0; index < expectedBytes.length; index++) { + if (expectedBytes[index] == actualBytes[index]) { + continue; + } + + int startPos = Math.max(0, index - Byte.SIZE); + int endPos = Math.min(startPos + Short.SIZE, expectedBytes.length); + if ((endPos - startPos) < Byte.SIZE) { + startPos = expectedBytes.length - Byte.SIZE; + endPos = expectedBytes.length; + } + + String expected = new String(expectedBytes, startPos, endPos - startPos, StandardCharsets.UTF_8); + String actual = new String(actualBytes, startPos, endPos - startPos, StandardCharsets.UTF_8); + fail("Mismatched data around offset " + index + ": expected='" + expected + "', actual='" + actual + "'"); + } + } + @Test public void testRemoteForwardingPayload() throws Exception { final int numIterations = 100; http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/aa7547b7/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java index 9b88a84..450184e 100644 --- a/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/common/forward/PortForwardingTest.java @@ -83,31 +83,35 @@ public class PortForwardingTest extends BaseTestSupport { private final org.slf4j.Logger log = LoggerFactory.getLogger(PortForwardingEventListener.class); @Override - public void establishingExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local, - SshdSocketAddress remote, boolean localForwarding) throws IOException { + public void establishingExplicitTunnel( + org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding) + throws IOException { log.info("establishingExplicitTunnel(session={}, local={}, remote={}, localForwarding={})", - session, local, remote, localForwarding); + session, local, remote, localForwarding); } @Override - public void establishedExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local, + public void establishedExplicitTunnel( + org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding, SshdSocketAddress boundAddress, Throwable reason) - throws IOException { + throws IOException { log.info("establishedExplicitTunnel(session={}, local={}, remote={}, bound={}, localForwarding={}): {}", - session, local, remote, boundAddress, localForwarding, reason); + session, local, remote, boundAddress, localForwarding, reason); } @Override - public void tearingDownExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address, - boolean localForwarding) throws IOException { + public void tearingDownExplicitTunnel( + org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding) + throws IOException { log.info("tearingDownExplicitTunnel(session={}, address={}, localForwarding={})", session, address, localForwarding); } @Override - public void tornDownExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address, - boolean localForwarding, Throwable reason) throws IOException { + public void tornDownExplicitTunnel( + org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding, Throwable reason) + throws IOException { log.info("tornDownExplicitTunnel(session={}, address={}, localForwarding={}, reason={})", - session, address, localForwarding, reason); + session, address, localForwarding, reason); } @Override @@ -117,8 +121,9 @@ public class PortForwardingTest extends BaseTestSupport { } @Override - public void establishedDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local, - SshdSocketAddress boundAddress, Throwable reason) throws IOException { + public void establishedDynamicTunnel( + org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason) + throws IOException { log.info("establishedDynamicTunnel(session={}, local={}, bound={}, reason={})", session, local, boundAddress, reason); } @@ -129,8 +134,9 @@ public class PortForwardingTest extends BaseTestSupport { } @Override - public void tornDownDynamicTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress address, - Throwable reason) throws IOException { + public void tornDownDynamicTunnel( + org.apache.sshd.common.session.Session session, SshdSocketAddress address, Throwable reason) + throws IOException { log.info("tornDownDynamicTunnel(session={}, address={}, reason={})", session, address, reason); } }; @@ -487,30 +493,31 @@ public class PortForwardingTest extends BaseTestSupport { @Test public void testLocalForwardingNative() throws Exception { - final AtomicReference<SshdSocketAddress> localAddressHolder = new AtomicReference<>(); - final AtomicReference<SshdSocketAddress> remoteAddressHolder = new AtomicReference<>(); - final AtomicReference<SshdSocketAddress> boundAddressHolder = new AtomicReference<>(); - final AtomicInteger tearDownSignal = new AtomicInteger(0); + AtomicReference<SshdSocketAddress> localAddressHolder = new AtomicReference<>(); + AtomicReference<SshdSocketAddress> remoteAddressHolder = new AtomicReference<>(); + AtomicReference<SshdSocketAddress> boundAddressHolder = new AtomicReference<>(); + AtomicInteger tearDownSignal = new AtomicInteger(0); @SuppressWarnings("checkstyle:anoninnerlength") PortForwardingEventListener listener = new PortForwardingEventListener() { @Override public void tornDownExplicitTunnel( org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding, Throwable reason) - throws IOException { + throws IOException { assertTrue("Unexpected remote tunnel has been torn down: address=" + address, localForwarding); assertEquals("Tear down indication not invoked", 1, tearDownSignal.get()); } @Override public void tornDownDynamicTunnel( - org.apache.sshd.common.session.Session session, SshdSocketAddress address, Throwable reason) throws IOException { + org.apache.sshd.common.session.Session session, SshdSocketAddress address, Throwable reason) + throws IOException { throw new UnsupportedOperationException("Unexpected dynamic tunnel torn down indication: session=" + session + ", address=" + address); } @Override public void tearingDownExplicitTunnel( org.apache.sshd.common.session.Session session, SshdSocketAddress address, boolean localForwarding) - throws IOException { + throws IOException { assertTrue("Unexpected remote tunnel being torn down: address=" + address, localForwarding); assertEquals("Duplicate tear down signalling", 1, tearDownSignal.incrementAndGet()); } @@ -524,7 +531,7 @@ public class PortForwardingTest extends BaseTestSupport { @Override public void establishingExplicitTunnel( org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding) - throws IOException { + throws IOException { assertTrue("Unexpected remote tunnel being established: local=" + local + ", remote=" + remote, localForwarding); assertNull("Duplicate establishment indication call for local address=" + local, localAddressHolder.getAndSet(local)); assertNull("Duplicate establishment indication call for remote address=" + remote, remoteAddressHolder.getAndSet(remote)); @@ -539,7 +546,7 @@ public class PortForwardingTest extends BaseTestSupport { @Override public void establishedExplicitTunnel(org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress remote, boolean localForwarding, SshdSocketAddress boundAddress, Throwable reason) - throws IOException { + throws IOException { assertTrue("Unexpected remote tunnel has been established: local=" + local + ", remote=" + remote + ", bound=" + boundAddress, localForwarding); assertSame("Mismatched established tunnel local address", local, localAddressHolder.get()); assertSame("Mismatched established tunnel remote address", remote, remoteAddressHolder.get()); @@ -549,7 +556,7 @@ public class PortForwardingTest extends BaseTestSupport { @Override public void establishedDynamicTunnel( org.apache.sshd.common.session.Session session, SshdSocketAddress local, SshdSocketAddress boundAddress, Throwable reason) - throws IOException { + throws IOException { throw new UnsupportedOperationException("Unexpected dynamic tunnel established indication: session=" + session + ", address=" + boundAddress); } };
