This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 1cc18b057ed706238e71123ae7c0cd4a8778613c
Author: Hao Hao <hao....@cloudera.com>
AuthorDate: Tue Feb 11 23:02:43 2020 -0800

    [java] KUDU-2971: add an outbound queue
    
    This commit updates the writer tasks to put the responses to an outbound
    queue, and uses a separate thread to poll the queue and write the messages
    to stdout. So that in case stdout fills up, the message parser tasks
    can continue processing the requests without blocking.
    
    Change-Id: I0793ad35af5342bb9af4f2f0eaed4090f78ef3c6
    Reviewed-on: http://gerrit.cloudera.org:8080/15210
    Reviewed-by: Adar Dembo <a...@cloudera.com>
    Reviewed-by: Attila Bukor <abu...@apache.org>
    Tested-by: Hao Hao <hao....@cloudera.com>
---
 .../kudu/subprocess/echo/TestEchoSubprocess.java   | 45 +++++++++--
 .../{MessageWriter.java => MessageParser.java}     | 43 +++--------
 .../org/apache/kudu/subprocess/MessageReader.java  | 21 +----
 .../org/apache/kudu/subprocess/MessageWriter.java  | 89 +++++-----------------
 .../java/org/apache/kudu/subprocess/QueueUtil.java | 62 +++++++++++++++
 .../kudu/subprocess/SubprocessConfiguration.java   | 24 +++---
 .../apache/kudu/subprocess/SubprocessExecutor.java | 67 +++++++++++-----
 .../org/apache/kudu/subprocess/TestMessageIO.java  |  6 +-
 8 files changed, 199 insertions(+), 158 deletions(-)

diff --git 
a/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
 
b/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
index 7151261..0e9b5ef 100644
--- 
a/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
+++ 
b/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
@@ -24,10 +24,12 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.UnsupportedEncodingException;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 
+import com.google.common.primitives.Bytes;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -38,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.kudu.subprocess.KuduSubprocessException;
 import org.apache.kudu.subprocess.MessageIO;
 import org.apache.kudu.subprocess.MessageTestUtil;
+import org.apache.kudu.subprocess.Subprocess.SubprocessResponsePB;
 import org.apache.kudu.subprocess.SubprocessExecutor;
 import org.apache.kudu.test.junit.RetryRule;
 
@@ -124,10 +127,10 @@ public class TestEchoSubprocess {
     final InputStream in = new ByteArrayInputStream(messageBytes);
     final PrintStream out =
             new PrintStream(new ByteArrayOutputStream(), false, "UTF-8");
-    final String[] args = {""};
     Throwable thrown = Assert.assertThrows(ExecutionException.class, new 
ThrowingRunnable() {
       @Override
       public void run() throws Exception {
+        final String[] args = {""};
         runEchoSubprocess(in, out, args, HAS_ERR, /* injectInterrupt= */false);
       }
     });
@@ -146,12 +149,10 @@ public class TestEchoSubprocess {
     final InputStream in = new ByteArrayInputStream(messageBytes);
     final PrintStream out =
             new PrintStreamWithIOException(new ByteArrayOutputStream(), false, 
"UTF-8");
-    // Only use one writer task to avoid get TimeoutException instead for
-    // writer tasks that haven't encountered any exceptions.
-    final String[] args = {"-w", "1"};
     Throwable thrown = Assert.assertThrows(ExecutionException.class, new 
ThrowingRunnable() {
       @Override
       public void run() throws Exception {
+        final String[] args = {""};
         runEchoSubprocess(in, out, args, HAS_ERR, /* injectInterrupt= */false);
       }
     });
@@ -169,14 +170,46 @@ public class TestEchoSubprocess {
         MessageTestUtil.createEchoSubprocessRequest(message));
     final InputStream in = new ByteArrayInputStream(messageBytes);
     final PrintStream out =
-            new PrintStream(new ByteArrayOutputStream(), false, "UTF-8");
-    final String[] args = {""};
+        new PrintStream(new ByteArrayOutputStream(), false, "UTF-8");
     Throwable thrown = Assert.assertThrows(ExecutionException.class, new 
ThrowingRunnable() {
       @Override
       public void run() throws Exception {
+        final String[] args = {""};
         runEchoSubprocess(in, out, args, HAS_ERR, /* injectInterrupt= */true);
       }
     });
     Assert.assertTrue(thrown.getMessage().contains("Unable to put the message 
to the queue"));
   }
+
+  /**
+   * Verifies when <code>MessageWriter</code> task is blocking on writing the 
message,
+   * <code>MessageParser</code> task can continue processing the requests 
without
+   * blocking.
+   */
+  @Test
+  public void testMessageParser() throws Exception  {
+    final byte[] messageBytes = Bytes.concat(
+        
MessageTestUtil.serializeMessage(MessageTestUtil.createEchoSubprocessRequest("a")),
+        
MessageTestUtil.serializeMessage(MessageTestUtil.createEchoSubprocessRequest("b")));
+    final InputStream in = new ByteArrayInputStream(messageBytes);
+    final PrintStream out =
+        new PrintStream(new ByteArrayOutputStream(), false, "UTF-8");
+    final SubprocessExecutor[] executors = new SubprocessExecutor[1];
+    System.setIn(in);
+    System.setOut(out);
+    Assert.assertThrows(TimeoutException.class, new ThrowingRunnable() {
+      @Override
+      public void run() throws Exception {
+        final String[] args = {""};
+        executors[0] = new SubprocessExecutor(NO_ERR);
+        // Block the message write for 1000 Ms.
+        executors[0].blockWriteMs(1000);
+        executors[0].run(args, new EchoProtocolHandler(), /* timeoutMs= */500);
+      }
+    });
+
+    // Verify that the message have been processed and placed to outbound 
queue.
+    BlockingQueue<SubprocessResponsePB> outboundQueue = 
executors[0].getOutboundQueue();
+    Assert.assertEquals(1, outboundQueue.size());
+  }
 }
diff --git 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java
 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageParser.java
similarity index 73%
copy from 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java
copy to 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageParser.java
index e53bbf4..6dc5f00 100644
--- 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java
+++ 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageParser.java
@@ -17,7 +17,6 @@
 
 package org.apache.kudu.subprocess;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.BlockingQueue;
 
@@ -32,52 +31,34 @@ import 
org.apache.kudu.subprocess.Subprocess.SubprocessRequestPB;
 import org.apache.kudu.subprocess.Subprocess.SubprocessResponsePB;
 
 /**
- * The {@link MessageWriter} class,
- *    1. retrieves one message from the queue at a time,
+ * The {@link MessageParser} class,
+ *    1. retrieves one message from the inbound queue at a time,
  *    2. processes the message and generates a response,
- *    3. and then writes the response to the underlying output stream.
+ *    3. and then puts the response to the outbound queue.
  */
 @InterfaceAudience.Private
-class MessageWriter implements Runnable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(MessageWriter.class);
+class MessageParser implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MessageParser.class);
   private final BlockingQueue<byte[]> inboundQueue;
-  private final MessageIO messageIO;
+  private final BlockingQueue<SubprocessResponsePB> outboundQueue;
   private final ProtocolHandler protocolHandler;
 
-  MessageWriter(BlockingQueue<byte[]> inboundQueue,
-                MessageIO messageIO,
+  MessageParser(BlockingQueue<byte[]> inboundQueue,
+                BlockingQueue<SubprocessResponsePB> outboundQueue,
                 ProtocolHandler protocolHandler) {
     Preconditions.checkNotNull(inboundQueue);
+    Preconditions.checkNotNull(outboundQueue);
     this.inboundQueue = inboundQueue;
-    this.messageIO = messageIO;
+    this.outboundQueue = outboundQueue;
     this.protocolHandler = protocolHandler;
   }
 
   @Override
   public void run() {
     while (true) {
-      // Take an element from the queue. If encountered InterruptedException,
-      // consider it to be fatal (as a signal to shutdown the task), and
-      // propagate it up the call stack.
-      byte[] data;
-      try {
-        data = inboundQueue.take();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Message: {} has been taken from the queue", data);
-        }
-      } catch (InterruptedException e) {
-        throw new KuduSubprocessException("Unable to take a message from the 
queue", e);
-      }
-
+      byte[] data = QueueUtil.take(inboundQueue);
       SubprocessResponsePB response = getResponse(data);
-
-      // Writes the response to the underlying output stream. IOException is 
fatal,
-      // and should be propagated up the call stack.
-      try {
-        messageIO.writeMessage(response);
-      } catch (IOException e) {
-        throw new KuduSubprocessException("Unable to write the protobuf 
message", e);
-      }
+      QueueUtil.put(outboundQueue, response);
     }
   }
 
diff --git 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
index d69ab38..979cc18 100644
--- 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
+++ 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
@@ -21,7 +21,6 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -30,7 +29,7 @@ import org.slf4j.LoggerFactory;
 /**
  * The {@link MessageReader} class,
  *   1. processes a message that reads from the underlying input stream.
- *   2. and then puts it to the incoming message queue.
+ *   2. and then puts it to the inbound message queue.
  *
  * Since {@link MessageIO#readBytes()} is not atomic, the implementation
  * of MessageReader is not thread-safe, and thus MessageReader should not
@@ -41,9 +40,7 @@ class MessageReader implements Runnable {
   private static final Logger LOG = 
LoggerFactory.getLogger(MessageReader.class);
   private final BlockingQueue<byte[]> inboundQueue;
   private final MessageIO messageIO;
-
-  @VisibleForTesting
-  private boolean injectInterrupt;
+  private final boolean injectInterrupt;
 
   MessageReader(BlockingQueue<byte[]> inboundQueue,
                 MessageIO messageIO,
@@ -77,22 +74,12 @@ class MessageReader implements Runnable {
         throw new KuduSubprocessException("Unable to read the protobuf 
message", e);
       }
 
-      // Put the message to the queue. If encountered InterruptedException
-      // during the put, consider it to be fatal (as a signal to shutdown
-      // the task), and propagate it up the call stack. Log a warning for
-      // empty message which is not expected.
+      // Log a warning for empty message which is not expected.
       if (data.length == 0) {
         LOG.warn("Empty message received.");
         continue;
       }
-      try {
-        inboundQueue.put(data);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Message: {} has been put on the queue", data);
-        }
-      } catch (InterruptedException e) {
-        throw new KuduSubprocessException("Unable to put the message to the 
queue", e);
-      }
+      QueueUtil.put(inboundQueue, data);
     }
   }
 }
diff --git 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java
 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java
index e53bbf4..227fba4 100644
--- 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java
+++ 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java
@@ -18,104 +18,51 @@
 package org.apache.kudu.subprocess;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.concurrent.BlockingQueue;
 
 import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import org.apache.kudu.WireProtocol.AppStatusPB;
-import org.apache.kudu.subprocess.Subprocess.SubprocessRequestPB;
 import org.apache.kudu.subprocess.Subprocess.SubprocessResponsePB;
 
 /**
  * The {@link MessageWriter} class,
- *    1. retrieves one message from the queue at a time,
- *    2. processes the message and generates a response,
- *    3. and then writes the response to the underlying output stream.
+ *    1. retrieves one message from the outbound queue at a time,
+ *    2. and then writes the response to the underlying output stream.
  */
 @InterfaceAudience.Private
 class MessageWriter implements Runnable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(MessageWriter.class);
-  private final BlockingQueue<byte[]> inboundQueue;
+  private final BlockingQueue<SubprocessResponsePB> outboundQueue;
   private final MessageIO messageIO;
-  private final ProtocolHandler protocolHandler;
+  private final long blockWriteMs;
 
-  MessageWriter(BlockingQueue<byte[]> inboundQueue,
+  MessageWriter(BlockingQueue<SubprocessResponsePB> outboundQueue,
                 MessageIO messageIO,
-                ProtocolHandler protocolHandler) {
-    Preconditions.checkNotNull(inboundQueue);
-    this.inboundQueue = inboundQueue;
+                long blockWriteMs) {
+    Preconditions.checkNotNull(outboundQueue);
+    Preconditions.checkNotNull(messageIO);
+    this.outboundQueue = outboundQueue;
     this.messageIO = messageIO;
-    this.protocolHandler = protocolHandler;
+    this.blockWriteMs = blockWriteMs;
   }
 
   @Override
   public void run() {
     while (true) {
-      // Take an element from the queue. If encountered InterruptedException,
-      // consider it to be fatal (as a signal to shutdown the task), and
-      // propagate it up the call stack.
-      byte[] data;
-      try {
-        data = inboundQueue.take();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Message: {} has been taken from the queue", data);
-        }
-      } catch (InterruptedException e) {
-        throw new KuduSubprocessException("Unable to take a message from the 
queue", e);
-      }
+      SubprocessResponsePB response = QueueUtil.take(outboundQueue);
 
-      SubprocessResponsePB response = getResponse(data);
-
-      // Writes the response to the underlying output stream. IOException is 
fatal,
+      // Write the response to the underlying output stream. IOException is 
fatal,
       // and should be propagated up the call stack.
       try {
         messageIO.writeMessage(response);
-      } catch (IOException e) {
+        // Block the write for the given milliseconds if needed (for tests 
only).
+        // -1 means the write will not be blocked.
+        if (blockWriteMs != -1) {
+          Thread.sleep(blockWriteMs);
+        }
+      } catch (IOException | InterruptedException e) {
         throw new KuduSubprocessException("Unable to write the protobuf 
message", e);
       }
     }
   }
-
-  /**
-   * Constructs a message with the given error status.
-   *
-   * @param errorCode the given error status
-   * @param resp the message builder
-   * @return a message with the given error status
-   */
-  static SubprocessResponsePB responseWithError(AppStatusPB.ErrorCode 
errorCode,
-                                                SubprocessResponsePB.Builder 
resp) {
-    Preconditions.checkNotNull(resp);
-    AppStatusPB.Builder errorBuilder = AppStatusPB.newBuilder();
-    errorBuilder.setCode(errorCode);
-    resp.setError(errorBuilder);
-    return resp.build();
-  }
-
-  /**
-   * Parses the given protobuf message. If encountered 
InvalidProtocolBufferException,
-   * which indicates the message is invalid, respond with an error message.
-   *
-   * @param data the protobuf message
-   * @return a SubprocessResponsePB
-   */
-  private SubprocessResponsePB getResponse(byte[] data) {
-    SubprocessResponsePB response;
-    SubprocessResponsePB.Builder responseBuilder = 
SubprocessResponsePB.newBuilder();
-    try {
-      // Parses the data as a message of SubprocessRequestPB type.
-      SubprocessRequestPB request = 
SubprocessRequestPB.parser().parseFrom(data);
-      response = protocolHandler.handleRequest(request);
-    } catch (InvalidProtocolBufferException e) {
-      LOG.warn(String.format("%s: %s", "Unable to parse the protobuf message",
-                             new String(data, StandardCharsets.UTF_8)), e);
-      response = responseWithError(AppStatusPB.ErrorCode.ILLEGAL_STATE, 
responseBuilder);
-    }
-    return response;
-  }
 }
diff --git 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/QueueUtil.java 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/QueueUtil.java
new file mode 100644
index 0000000..f06efd1
--- /dev/null
+++ 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/QueueUtil.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.subprocess;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Util class for taking and putting messages to a queue.
+ */
+@InterfaceAudience.Private
+public class QueueUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(QueueUtil.class);
+
+  static <DataT> DataT take(BlockingQueue<DataT> queue) {
+    // Take an element from the queue. If encountered InterruptedException,
+    // consider it to be fatal (as a signal to shutdown the task), and
+    // propagate it up the call stack.
+    DataT data;
+    try {
+      data = queue.take();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Message: {} has been taken from the queue", data);
+      }
+    } catch (InterruptedException e) {
+      throw new KuduSubprocessException("Unable to take a message from the 
queue", e);
+    }
+    return data;
+  }
+
+  static <DataT> void put(BlockingQueue<DataT> queue, DataT data) {
+    // Put the message to the queue. If encountered InterruptedException
+    // during the put, consider it to be fatal (as a signal to shutdown
+    // the task), and propagate it up the call stack.
+    try {
+      queue.put(data);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Message: {} has been put on the queue", data);
+      }
+    } catch (InterruptedException e) {
+      throw new KuduSubprocessException("Unable to put the message to the 
queue", e);
+    }
+  }
+}
diff --git 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
index dc2e72c..f7e8991 100644
--- 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
+++ 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
@@ -33,8 +33,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 public class SubprocessConfiguration {
   private int queueSize;
   private static final int QUEUE_SIZE_DEFAULT = 100;
-  private int maxWriterThreads;
-  private static final int MAX_WRITER_THREADS_DEFAULT = 3;
+  private int maxMsgParserThreads;
+  private static final int MAX_MSG_PARSER_THREADS_DEFAULT = 3;
   private int maxMsgBytes;
 
   @VisibleForTesting
@@ -53,11 +53,11 @@ public class SubprocessConfiguration {
   }
 
   /**
-   * @return the maximum number of threads in the writer thread pool, or the
-   * default value if not provided
+   * @return the maximum number of threads in the message parser thread pool,
+   * or the default value if not provided
    */
-  int getMaxWriterThreads() {
-    return maxWriterThreads;
+  int getMaxMsgParserThreads() {
+    return maxMsgParserThreads;
   }
 
   /**
@@ -85,10 +85,10 @@ public class SubprocessConfiguration {
     queueSizeOpt.setRequired(false);
     options.addOption(queueSizeOpt);
 
-    final String maxWriterThreadsLongOpt = "maxWriterThreads";
+    final String maxMsgParserThreadsLongOpt = "maxMsgParserThreads";
     Option maxThreadsOpt = new Option(
-        "w", maxWriterThreadsLongOpt, /* hasArg= */true,
-        "Maximum number of threads in the writer thread pool for subprocess");
+        "p", maxMsgParserThreadsLongOpt, /* hasArg= */true,
+        "Maximum number of threads in the message parser thread pool for 
subprocess");
     maxThreadsOpt.setRequired(false);
     options.addOption(maxThreadsOpt);
 
@@ -103,12 +103,12 @@ public class SubprocessConfiguration {
     try {
       CommandLine cmd = parser.parse(options, args);
       String queueSize = cmd.getOptionValue(queueSizeLongOpt);
-      String maxWriterThreads = cmd.getOptionValue(maxWriterThreadsLongOpt);
+      String maxParserThreads = cmd.getOptionValue(maxMsgParserThreadsLongOpt);
       String maxMsgBytes = cmd.getOptionValue(maxMsgBytesLongOpt);
       this.queueSize = queueSize == null ?
           QUEUE_SIZE_DEFAULT : Integer.parseInt(queueSize);
-      this.maxWriterThreads = maxWriterThreads == null ?
-          MAX_WRITER_THREADS_DEFAULT : Integer.parseInt(maxWriterThreads);
+      this.maxMsgParserThreads = maxParserThreads == null ?
+          MAX_MSG_PARSER_THREADS_DEFAULT : Integer.parseInt(maxParserThreads);
       this.maxMsgBytes = maxMsgBytes == null ?
           MAX_MESSAGE_BYTES_DEFAULT : Integer.parseInt(maxMsgBytes);
     } catch (ParseException e) {
diff --git 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
index d531850..d329d21 100644
--- 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
+++ 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
@@ -37,21 +37,27 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.kudu.subprocess.Subprocess.SubprocessResponsePB;
+
 /**
  * The {@link SubprocessExecutor} class,
  *    1. parses the command line to get the configuration,
  *    2. has a single reader thread that continuously reads protobuf-based
- *       messages from the standard input and puts the messages to a FIFO
+ *       messages from standard input and puts the messages to a FIFO inbound
  *       blocking queue,
- *    3. has multiple writer threads that continuously retrieve the messages
- *       from the queue, process them and write the responses to the
- *       standard output.
+ *    3. has multiple parser threads that continuously retrieve the messages
+ *       from the inbound queue, process them and put the responses to a FIFO
+ *       outbound blocking queue,
+ *    4. has a single writer thread that continuously retrieves the responses
+ *       from the outbound queue, and writes the responses to standard output.
  */
 @InterfaceAudience.Private
 public class SubprocessExecutor {
   private static final Logger LOG = 
LoggerFactory.getLogger(SubprocessExecutor.class);
   private final Function<Throwable, Void> errorHandler;
   private boolean injectInterrupt = false;
+  private long blockWriteMs = -1;
+  private BlockingQueue<SubprocessResponsePB> outboundQueue;
 
   public SubprocessExecutor() {
     errorHandler = (t) -> {
@@ -82,13 +88,15 @@ public class SubprocessExecutor {
   public void run(String[] args, ProtocolHandler protocolHandler, long 
timeoutMs)
       throws InterruptedException, ExecutionException, TimeoutException {
     SubprocessConfiguration conf = new SubprocessConfiguration(args);
-    int maxWriterThread = conf.getMaxWriterThreads();
+    int maxMsgParserThread = conf.getMaxMsgParserThreads();
     int queueSize = conf.getQueueSize();
     int maxMessageBytes = conf.getMaxMessageBytes();
 
     BlockingQueue<byte[]> inboundQueue = new ArrayBlockingQueue<>(queueSize, 
/* fair= */true);
+    outboundQueue = new ArrayBlockingQueue<>(queueSize, /* fair= */true);
     ExecutorService readerService = Executors.newSingleThreadExecutor();
-    ExecutorService writerService = 
Executors.newFixedThreadPool(maxWriterThread);
+    ExecutorService parserService = 
Executors.newFixedThreadPool(maxMsgParserThread);
+    ExecutorService writerService = Executors.newSingleThreadExecutor();
 
     // Wrap the system output in a SubprocessOutputStream so IOExceptions
     // from system output are propagated up instead of being silently 
swallowed.
@@ -105,24 +113,31 @@ public class SubprocessExecutor {
       CompletableFuture<Void> readerFuture = 
CompletableFuture.runAsync(reader, readerService);
       readerFuture.exceptionally(errorHandler);
 
-      // Start multiple writer threads and run the tasks asynchronously.
-      MessageWriter writer = new MessageWriter(inboundQueue, messageIO, 
protocolHandler);
-      List<CompletableFuture<Void>> writerFutures = new ArrayList<>();
-      for (int i = 0; i < maxWriterThread; i++) {
-        CompletableFuture<Void> writerFuture = 
CompletableFuture.runAsync(writer, writerService);
-        writerFuture.exceptionally(errorHandler);
-        writerFutures.add(writerFuture);
+      // Start multiple message parser threads and run the tasks 
asynchronously.
+      MessageParser parser = new MessageParser(inboundQueue, outboundQueue, 
protocolHandler);
+      List<CompletableFuture<Void>> parserFutures = new ArrayList<>();
+      for (int i = 0; i < maxMsgParserThread; i++) {
+        CompletableFuture<Void> parserFuture = 
CompletableFuture.runAsync(parser, parserService);
+        parserFuture.exceptionally(errorHandler);
+        parserFutures.add(parserFuture);
       }
 
-      // Wait until the tasks finish execution. -1 means the reader (or 
writer) tasks
-      // continue the execution until finish. In cases where we don't want the 
tasks
-      // to run forever, e.g. in tests, wait for the specified timeout.
+      // Start a single writer thread and run the task asynchronously.
+      MessageWriter writer = new MessageWriter(outboundQueue, messageIO, 
blockWriteMs);
+      CompletableFuture<Void> writerFuture = 
CompletableFuture.runAsync(writer, writerService);
+      writerFuture.exceptionally(errorHandler);
+
+      // Wait until the tasks finish execution. -1 means the reader (parser, 
or writer)
+      // tasks continue the execution until finish. In cases where we don't 
want the
+      // tasks to run forever, e.g. in tests, wait for the specified timeout.
       if (timeoutMs == -1) {
         readerFuture.join();
-        CompletableFuture.allOf(writerFutures.toArray(new 
CompletableFuture[0])).join();
+        writerFuture.join();
+        CompletableFuture.allOf(parserFutures.toArray(new 
CompletableFuture[0])).join();
       } else {
         readerFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
-        CompletableFuture.allOf(writerFutures.toArray(new 
CompletableFuture[0]))
+        writerFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
+        CompletableFuture.allOf(parserFutures.toArray(new 
CompletableFuture[0]))
                          .get(timeoutMs, TimeUnit.MILLISECONDS);
       }
     } catch (IOException e) {
@@ -131,10 +146,26 @@ public class SubprocessExecutor {
   }
 
   /**
+   * Returns the outbound message queue.
+   */
+  @VisibleForTesting
+  public BlockingQueue<SubprocessResponsePB> getOutboundQueue() {
+    return outboundQueue;
+  }
+
+  /**
    * Sets the interruption flag to true.
    */
   @VisibleForTesting
   public void interrupt() {
     injectInterrupt = true;
   }
+
+  /**
+   * Blocks the message write for the given milliseconds.
+   */
+  @VisibleForTesting
+  public void blockWriteMs(long blockWriteMs) {
+    this.blockWriteMs = blockWriteMs;
+  }
 }
diff --git 
a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java
 
b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java
index fba9d89..011f4e9 100644
--- 
a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java
+++ 
b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java
@@ -77,7 +77,7 @@ public class TestMessageIO {
    * <code>IOException</code>.
    */
   @Test
-  public void testSubprocessOutputStream() throws Exception {
+  public void testSubprocessOutputStream() {
     final String data = "data";
     final SubprocessRequestPB request = 
MessageTestUtil.createEchoSubprocessRequest(data);
     final PrintStreamOverload printStreamOverload =
@@ -101,7 +101,7 @@ public class TestMessageIO {
    * bytes size should cause expected error.
    */
   @Test
-  public void testMalformedMessageExceedMaxBytes() throws Exception {
+  public void testMalformedMessageExceedMaxBytes() {
     byte[] size = 
MessageIO.intToBytes(SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT + 1);
     byte[] body = new byte[0];
     byte[] malformedMessage = Bytes.concat(size, body);
@@ -123,7 +123,7 @@ public class TestMessageIO {
    * and body (not enough data in the body) should cause expected error.
    */
   @Test
-  public void testMalformedMessageMismatchSize() throws Exception {
+  public void testMalformedMessageMismatchSize() {
     byte[] size = MessageIO.intToBytes(100);
     byte[] body = new byte[10];
     Arrays.fill(body, (byte)0);

Reply via email to