This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e37c9f83b0144971b8d081ac1b3ba8e53abff859
Author: Andrew Wong <[email protected]>
AuthorDate: Wed Mar 4 00:25:12 2020 -0800

    subprocess: fix TestEchoSubprocess
    
    There were a number of issues with TestEchoSubprocess:
    - Many tests coarsely expected timeouts, leading to the non-execution of
      many parts of the test. Instead, I wrapped executor.run() with a
      executor.runUntilTimeout() that expects the timeout, rather than
      relying on test-level expectation of timeouts.
    - Initializing BufferedInputStream with the same pipe multiple times led
      to us to miss out on some bytes in the pipe. This initializes the
      BufferedInputStream once as a member.
    - testMsgWithEmptyMessage() wouldn't time out because its readers were
      stuck parsing an empty pipe. This is resolved by adding more requests
      to the pipe.
    
    Change-Id: I351ae84285fa5eb9db5dcc374dd404e475a9ddb4
    Reviewed-on: http://gerrit.cloudera.org:8080/15362
    Tested-by: Andrew Wong <[email protected]>
    Reviewed-by: Hao Hao <[email protected]>
---
 .../apache/kudu/subprocess/SubprocessExecutor.java | 18 +++++++++
 .../apache/kudu/subprocess/SubprocessTestUtil.java |  9 +++--
 .../kudu/subprocess/echo/TestEchoSubprocess.java   | 43 ++++++++++++----------
 3 files changed, 48 insertions(+), 22 deletions(-)

diff --git 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
index bbecfec..b56d301 100644
--- 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
+++ 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -168,4 +169,21 @@ public class SubprocessExecutor {
   public void blockWriteMs(long blockWriteMs) {
     this.blockWriteMs = blockWriteMs;
   }
+
+  /**
+   * Wrapper around <code>run()</code> that runs until 'timeoutMs' elapses,
+   * catches any timeout exceptions, and returns.
+   *
+   * Used in tests.
+   * TODO(awong): it'd be nice if we had a nicer way to shut down the executor.
+   */
+  public void runUntilTimeout(String[] args, ProtocolHandler handler, long 
timeoutMs)
+      throws ExecutionException, InterruptedException {
+    Preconditions.checkArgument(timeoutMs != -1);
+    try {
+     run(args, handler, timeoutMs);
+    } catch (TimeoutException e) {
+      // no-op
+    }
+  }
 }
diff --git 
a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/SubprocessTestUtil.java
 
b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/SubprocessTestUtil.java
index b7d44c9..1abb019 100644
--- 
a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/SubprocessTestUtil.java
+++ 
b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/SubprocessTestUtil.java
@@ -66,8 +66,11 @@ public class SubprocessTestUtil {
   protected PipedOutputStream requestSenderPipe;
 
   // Pipe that we can read from that will receive responses from the
-  // subprocess's output pipe.
+  // subprocess's output pipe. We'll read from it via BufferedInputStream,
+  // so wrap the pipe here.
   protected final PipedInputStream responseReceiverPipe = new 
PipedInputStream();
+  private final BufferedInputStream bufferedInputStream =
+      new BufferedInputStream(responseReceiverPipe);
 
   public static class PrintStreamWithIOException extends PrintStream {
     public PrintStreamWithIOException(OutputStream out, boolean autoFlush, 
String encoding)
@@ -91,8 +94,8 @@ public class SubprocessTestUtil {
   // Receives a response from the receiver pipe and deserializes it into a
   // SubprocessResponsePB.
   public Subprocess.SubprocessResponsePB receiveResponse() throws IOException {
-    BufferedInputStream bufferedInput = new 
BufferedInputStream(responseReceiverPipe);
-    return SubprocessTestUtil.deserializeMessage(bufferedInput, 
Subprocess.SubprocessResponsePB.parser());
+    return SubprocessTestUtil.deserializeMessage(bufferedInputStream,
+                                                 
Subprocess.SubprocessResponsePB.parser());
   }
 
   // Sets up and returns a SubprocessExecutor with the given error handler and
diff --git 
a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
 
b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
index 66f7ddc..5f0e3c1 100644
--- 
a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
+++ 
b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
@@ -79,13 +79,13 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
    * Test a regular old message. There should be no exceptions of any kind.
    * We should also see some metrics that make sense.
    */
-  @Test(expected = TimeoutException.class)
+  @Test
   public void testBasicMsg() throws Exception {
     SubprocessExecutor executor =
         setUpExecutorIO(NO_ERR, /*injectIOError*/false);
     sendRequestToPipe(createEchoSubprocessRequest(MESSAGE));
 
-    executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
+    executor.runUntilTimeout(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
     SubprocessResponsePB spResp = receiveResponse();
     EchoResponsePB echoResp = 
spResp.getResponse().unpack(EchoResponsePB.class);
     Assert.assertEquals(MESSAGE, echoResp.getData());
@@ -111,7 +111,7 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
    * Test to see what happens when the execution is the bottleneck. We should
    * see it in the execution time and inbound queue time and length metrics.
    */
-  @Test(expected = TimeoutException.class)
+  @Test
   public void testSlowExecutionMetrics() throws Exception {
     SubprocessExecutor executor =
       setUpExecutorIO(NO_ERR, /*injectIOError*/false);
@@ -122,7 +122,7 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
 
     // Run the executor with a single parser thread so we can make stronger
     // assumptions about timing.
-    executor.run(new String[]{"-p", "1"}, new EchoProtocolHandler(), 
TIMEOUT_MS);
+    executor.runUntilTimeout(new String[]{"-p", "1"}, new 
EchoProtocolHandler(), TIMEOUT_MS);
 
     SubprocessMetricsPB m = receiveResponse().getMetrics();
     long inboundQueueLength = m.getInboundQueueLength();
@@ -167,16 +167,16 @@ public class TestEchoSubprocess extends 
SubprocessTestUtil {
    * Test to see what happens when writing is the bottleneck. We should see it
    * in the outbound queue metrics.
    */
-  @Test(expected = TimeoutException.class)
+  @Test
   public void testSlowWriterMetrics() throws Exception {
     SubprocessExecutor executor =
-      setUpExecutorIO(NO_ERR, /*injectIOError*/false);
+        setUpExecutorIO(NO_ERR, /*injectIOError*/false);
     final int BLOCK_MS = 200;
     executor.blockWriteMs(BLOCK_MS);
     sendRequestToPipe(createEchoSubprocessRequest(MESSAGE));
     sendRequestToPipe(createEchoSubprocessRequest(MESSAGE));
     sendRequestToPipe(createEchoSubprocessRequest(MESSAGE));
-    executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
+    executor.runUntilTimeout(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
 
     // In writing the first request, the other two requests should've been
     // close behind, likely both in the outbound queue.
@@ -186,29 +186,34 @@ public class TestEchoSubprocess extends 
SubprocessTestUtil {
     m = receiveResponse().getMetrics();
     Assert.assertEquals(1, m.getOutboundQueueLength());
     Assert.assertTrue(
-      String.format("Expected a higher outbound queue time: %s ms", 
m.getOutboundQueueTimeMs()),
-      m.getOutboundQueueTimeMs() >= BLOCK_MS);
+        String.format("Expected a higher outbound queue time: %s ms", 
m.getOutboundQueueTimeMs()),
+        m.getOutboundQueueTimeMs() >= BLOCK_MS);
 
     m = receiveResponse().getMetrics();
     Assert.assertEquals(0, m.getOutboundQueueLength());
     Assert.assertTrue(
-      String.format("Expected a higher outbound queue time: %s ms", 
m.getOutboundQueueTimeMs()),
-      m.getOutboundQueueTimeMs() >= 2 * BLOCK_MS);
+        String.format("Expected a higher outbound queue time: %s ms", 
m.getOutboundQueueTimeMs()),
+        m.getOutboundQueueTimeMs() >= 2 * BLOCK_MS);
   }
 
   /**
    * Test what happens when we send a message that is completely empty (i.e.
    * not an empty SubprocessRequestPB message -- no message at all).
    */
-  @Test(expected = TimeoutException.class)
+  @Test
   public void testMsgWithEmptyMessage() throws Exception {
     SubprocessExecutor executor = setUpExecutorIO(NO_ERR,
                                                   /*injectIOError*/false);
     requestSenderPipe.write(MessageIO.intToBytes(0));
-    executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
+    // NOTE: reading IO when the pipe is virtually empty leads us to hang. So
+    // let's put something else onto the pipe and just ensure that our empty
+    // message was a no-op.
+    sendRequestToPipe(createEchoSubprocessRequest(MESSAGE));
+    executor.runUntilTimeout(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
 
-    // We should see no bytes land in the receiver pipe.
-    Assert.assertEquals(-1, responseReceiverPipe.read());
+    SubprocessResponsePB spResp = receiveResponse();
+    EchoResponsePB echoResp = 
spResp.getResponse().unpack(EchoResponsePB.class);
+    Assert.assertEquals(MESSAGE, echoResp.getData());
   }
 
   /**
@@ -237,8 +242,8 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
     // CompletableFuture because, in waiting for completion, the MessageReader
     // times out before CompletableFuture.get() is called on the writer.
     assertIncludingSuppressedThrows(IOException.class,
-      "Unable to write to print stream",
-      () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS));
+        "Unable to write to print stream",
+        () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS));
   }
 
   /**
@@ -261,14 +266,14 @@ public class TestEchoSubprocess extends 
SubprocessTestUtil {
    * <code>MessageParser</code> tasks can continue making progress.
    */
   @Test
-  public void testMessageParser() throws Exception  {
+  public void testSlowWriterDoesntBlockQueues() throws Exception  {
     SubprocessExecutor executor =
         setUpExecutorIO(NO_ERR, /*injectIOError*/false);
     sendRequestToPipe(createEchoSubprocessRequest("a"));
     sendRequestToPipe(createEchoSubprocessRequest("b"));
     executor.blockWriteMs(1000);
     Assert.assertThrows(TimeoutException.class,
-        () -> executor.run(NO_ARGS, new EchoProtocolHandler(), 
/*timeoutMs*/500));
+        () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS));
 
     // We should see a single message in the outbound queue. The other one is
     // blocked writing.

Reply via email to