This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit e37c9f83b0144971b8d081ac1b3ba8e53abff859 Author: Andrew Wong <[email protected]> AuthorDate: Wed Mar 4 00:25:12 2020 -0800 subprocess: fix TestEchoSubprocess There were a number of issues with TestEchoSubprocess: - Many tests coarsely expected timeouts, leading to the non-execution of many parts of the test. Instead, I wrapped executor.run() with a executor.runUntilTimeout() that expects the timeout, rather than relying on test-level expectation of timeouts. - Initializing BufferedInputStream with the same pipe multiple times led to us to miss out on some bytes in the pipe. This initializes the BufferedInputStream once as a member. - testMsgWithEmptyMessage() wouldn't time out because its readers were stuck parsing an empty pipe. This is resolved by adding more requests to the pipe. Change-Id: I351ae84285fa5eb9db5dcc374dd404e475a9ddb4 Reviewed-on: http://gerrit.cloudera.org:8080/15362 Tested-by: Andrew Wong <[email protected]> Reviewed-by: Hao Hao <[email protected]> --- .../apache/kudu/subprocess/SubprocessExecutor.java | 18 +++++++++ .../apache/kudu/subprocess/SubprocessTestUtil.java | 9 +++-- .../kudu/subprocess/echo/TestEchoSubprocess.java | 43 ++++++++++++---------- 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java index bbecfec..b56d301 100644 --- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Function; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,4 +169,21 @@ public class SubprocessExecutor { public void blockWriteMs(long blockWriteMs) { this.blockWriteMs = blockWriteMs; } + + /** + * Wrapper around <code>run()</code> that runs until 'timeoutMs' elapses, + * catches any timeout exceptions, and returns. + * + * Used in tests. + * TODO(awong): it'd be nice if we had a nicer way to shut down the executor. + */ + public void runUntilTimeout(String[] args, ProtocolHandler handler, long timeoutMs) + throws ExecutionException, InterruptedException { + Preconditions.checkArgument(timeoutMs != -1); + try { + run(args, handler, timeoutMs); + } catch (TimeoutException e) { + // no-op + } + } } diff --git a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/SubprocessTestUtil.java b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/SubprocessTestUtil.java index b7d44c9..1abb019 100644 --- a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/SubprocessTestUtil.java +++ b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/SubprocessTestUtil.java @@ -66,8 +66,11 @@ public class SubprocessTestUtil { protected PipedOutputStream requestSenderPipe; // Pipe that we can read from that will receive responses from the - // subprocess's output pipe. + // subprocess's output pipe. We'll read from it via BufferedInputStream, + // so wrap the pipe here. protected final PipedInputStream responseReceiverPipe = new PipedInputStream(); + private final BufferedInputStream bufferedInputStream = + new BufferedInputStream(responseReceiverPipe); public static class PrintStreamWithIOException extends PrintStream { public PrintStreamWithIOException(OutputStream out, boolean autoFlush, String encoding) @@ -91,8 +94,8 @@ public class SubprocessTestUtil { // Receives a response from the receiver pipe and deserializes it into a // SubprocessResponsePB. public Subprocess.SubprocessResponsePB receiveResponse() throws IOException { - BufferedInputStream bufferedInput = new BufferedInputStream(responseReceiverPipe); - return SubprocessTestUtil.deserializeMessage(bufferedInput, Subprocess.SubprocessResponsePB.parser()); + return SubprocessTestUtil.deserializeMessage(bufferedInputStream, + Subprocess.SubprocessResponsePB.parser()); } // Sets up and returns a SubprocessExecutor with the given error handler and diff --git a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java index 66f7ddc..5f0e3c1 100644 --- a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java +++ b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java @@ -79,13 +79,13 @@ public class TestEchoSubprocess extends SubprocessTestUtil { * Test a regular old message. There should be no exceptions of any kind. * We should also see some metrics that make sense. */ - @Test(expected = TimeoutException.class) + @Test public void testBasicMsg() throws Exception { SubprocessExecutor executor = setUpExecutorIO(NO_ERR, /*injectIOError*/false); sendRequestToPipe(createEchoSubprocessRequest(MESSAGE)); - executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS); + executor.runUntilTimeout(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS); SubprocessResponsePB spResp = receiveResponse(); EchoResponsePB echoResp = spResp.getResponse().unpack(EchoResponsePB.class); Assert.assertEquals(MESSAGE, echoResp.getData()); @@ -111,7 +111,7 @@ public class TestEchoSubprocess extends SubprocessTestUtil { * Test to see what happens when the execution is the bottleneck. We should * see it in the execution time and inbound queue time and length metrics. */ - @Test(expected = TimeoutException.class) + @Test public void testSlowExecutionMetrics() throws Exception { SubprocessExecutor executor = setUpExecutorIO(NO_ERR, /*injectIOError*/false); @@ -122,7 +122,7 @@ public class TestEchoSubprocess extends SubprocessTestUtil { // Run the executor with a single parser thread so we can make stronger // assumptions about timing. - executor.run(new String[]{"-p", "1"}, new EchoProtocolHandler(), TIMEOUT_MS); + executor.runUntilTimeout(new String[]{"-p", "1"}, new EchoProtocolHandler(), TIMEOUT_MS); SubprocessMetricsPB m = receiveResponse().getMetrics(); long inboundQueueLength = m.getInboundQueueLength(); @@ -167,16 +167,16 @@ public class TestEchoSubprocess extends SubprocessTestUtil { * Test to see what happens when writing is the bottleneck. We should see it * in the outbound queue metrics. */ - @Test(expected = TimeoutException.class) + @Test public void testSlowWriterMetrics() throws Exception { SubprocessExecutor executor = - setUpExecutorIO(NO_ERR, /*injectIOError*/false); + setUpExecutorIO(NO_ERR, /*injectIOError*/false); final int BLOCK_MS = 200; executor.blockWriteMs(BLOCK_MS); sendRequestToPipe(createEchoSubprocessRequest(MESSAGE)); sendRequestToPipe(createEchoSubprocessRequest(MESSAGE)); sendRequestToPipe(createEchoSubprocessRequest(MESSAGE)); - executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS); + executor.runUntilTimeout(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS); // In writing the first request, the other two requests should've been // close behind, likely both in the outbound queue. @@ -186,29 +186,34 @@ public class TestEchoSubprocess extends SubprocessTestUtil { m = receiveResponse().getMetrics(); Assert.assertEquals(1, m.getOutboundQueueLength()); Assert.assertTrue( - String.format("Expected a higher outbound queue time: %s ms", m.getOutboundQueueTimeMs()), - m.getOutboundQueueTimeMs() >= BLOCK_MS); + String.format("Expected a higher outbound queue time: %s ms", m.getOutboundQueueTimeMs()), + m.getOutboundQueueTimeMs() >= BLOCK_MS); m = receiveResponse().getMetrics(); Assert.assertEquals(0, m.getOutboundQueueLength()); Assert.assertTrue( - String.format("Expected a higher outbound queue time: %s ms", m.getOutboundQueueTimeMs()), - m.getOutboundQueueTimeMs() >= 2 * BLOCK_MS); + String.format("Expected a higher outbound queue time: %s ms", m.getOutboundQueueTimeMs()), + m.getOutboundQueueTimeMs() >= 2 * BLOCK_MS); } /** * Test what happens when we send a message that is completely empty (i.e. * not an empty SubprocessRequestPB message -- no message at all). */ - @Test(expected = TimeoutException.class) + @Test public void testMsgWithEmptyMessage() throws Exception { SubprocessExecutor executor = setUpExecutorIO(NO_ERR, /*injectIOError*/false); requestSenderPipe.write(MessageIO.intToBytes(0)); - executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS); + // NOTE: reading IO when the pipe is virtually empty leads us to hang. So + // let's put something else onto the pipe and just ensure that our empty + // message was a no-op. + sendRequestToPipe(createEchoSubprocessRequest(MESSAGE)); + executor.runUntilTimeout(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS); - // We should see no bytes land in the receiver pipe. - Assert.assertEquals(-1, responseReceiverPipe.read()); + SubprocessResponsePB spResp = receiveResponse(); + EchoResponsePB echoResp = spResp.getResponse().unpack(EchoResponsePB.class); + Assert.assertEquals(MESSAGE, echoResp.getData()); } /** @@ -237,8 +242,8 @@ public class TestEchoSubprocess extends SubprocessTestUtil { // CompletableFuture because, in waiting for completion, the MessageReader // times out before CompletableFuture.get() is called on the writer. assertIncludingSuppressedThrows(IOException.class, - "Unable to write to print stream", - () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS)); + "Unable to write to print stream", + () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS)); } /** @@ -261,14 +266,14 @@ public class TestEchoSubprocess extends SubprocessTestUtil { * <code>MessageParser</code> tasks can continue making progress. */ @Test - public void testMessageParser() throws Exception { + public void testSlowWriterDoesntBlockQueues() throws Exception { SubprocessExecutor executor = setUpExecutorIO(NO_ERR, /*injectIOError*/false); sendRequestToPipe(createEchoSubprocessRequest("a")); sendRequestToPipe(createEchoSubprocessRequest("b")); executor.blockWriteMs(1000); Assert.assertThrows(TimeoutException.class, - () -> executor.run(NO_ARGS, new EchoProtocolHandler(), /*timeoutMs*/500)); + () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS)); // We should see a single message in the outbound queue. The other one is // blocked writing.
