This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 622260415fe2b0d2dd5f14f8e1b0e4a03eff2662 Author: Hao Hao <[email protected]> AuthorDate: Mon Sep 30 21:47:13 2019 -0700 [java] KUDU-2971: process communicates via protobuf-based protocol This commit adds a java tool that can communicate over a stdin/stdout pipe via protobuf-based protocol. It is useful in cases a Kudu process (e.g. master) needs to talk to third-party libraries written in Java. This tool has: 1) a single reader thread that continuously reads protobuf-based messages from the standard input and puts the messages to a FIFO blocking queue; 2) multiple writer threads that continuously retrieve the messages from the queue, process them and write the responses to the standard output. IOException is fatal and causes the program to exit, e.g. I/O errors when reading/writing to the pipe, and parsing malformed protobuf messages. If encounter InterruptedException during placing/getting messages to/from the queue, we consider it to be a signal to shutdown the task which cause the program to exit as well. To support a new protobuf message type, simply extend the 'ProtocolProcessor' interface and add the specific ProcessorMain class (similar to 'EchoProcessorMain') for the message type. Change-Id: Iaf9ad24dbc9acc681284b6433836271b5b4c7982 Reviewed-on: http://gerrit.cloudera.org:8080/14329 Reviewed-by: Adar Dembo <[email protected]> Reviewed-by: Andrew Wong <[email protected]> Tested-by: Kudu Jenkins --- .../build.gradle} | 38 +++-- .../kudu/subprocess/echo/EchoProtocolHandler.java | 44 ++++++ .../kudu/subprocess/echo/EchoSubprocessMain.java} | 28 ++-- .../kudu/subprocess/echo/TestEchoSubprocess.java | 172 +++++++++++++++++++++ .../build.gradle} | 40 +++-- .../kudu/subprocess/KuduSubprocessException.java} | 34 ++-- .../java/org/apache/kudu/subprocess/MessageIO.java | 145 +++++++++++++++++ .../org/apache/kudu/subprocess/MessageReader.java | 98 ++++++++++++ .../org/apache/kudu/subprocess/MessageWriter.java | 122 +++++++++++++++ .../apache/kudu/subprocess/ProtocolHandler.java | 72 +++++++++ .../kudu/subprocess/SubprocessConfiguration.java | 118 ++++++++++++++ .../apache/kudu/subprocess/SubprocessExecutor.java | 138 +++++++++++++++++ .../kudu/subprocess/SubprocessOutputStream.java | 74 +++++++++ .../src/main/resources/log4j2.properties | 29 ++++ .../apache/kudu/subprocess/MessageTestUtil.java | 87 +++++++++++ .../org/apache/kudu/subprocess/TestMessageIO.java | 138 +++++++++++++++++ java/settings.gradle | 2 + 17 files changed, 1323 insertions(+), 56 deletions(-) diff --git a/java/settings.gradle b/java/kudu-subprocess-echo/build.gradle similarity index 52% copy from java/settings.gradle copy to java/kudu-subprocess-echo/build.gradle index a89c703..d28bf0b 100644 --- a/java/settings.gradle +++ b/java/kudu-subprocess-echo/build.gradle @@ -15,18 +15,28 @@ // specific language governing permissions and limitations // under the License. -// This file contains the configuration of the project hierarchy. -// Mainly we just define what subprojects are in the build. +apply from: "$rootDir/gradle/protobuf.gradle" +apply from: "$rootDir/gradle/shadow.gradle" -rootProject.name = "kudu-parent" -include "kudu-backup" -include "kudu-backup-common" -include "kudu-backup-tools" -include "kudu-client" -include "kudu-client-tools" -include "kudu-hive" -include "kudu-jepsen" -include "kudu-mapreduce" -include "kudu-spark" -include "kudu-spark-tools" -include "kudu-test-utils" +dependencies { + // Note: We don't use the shaded version to avoid protobuf message type + // got shaded and causes incompatible bounds error for type casting. + compile (project(path: ":kudu-subprocess")) + compile libs.protobufJava + compile libs.protobufJavaUtil + + optional libs.yetusAnnotations + + testCompile project(path: ":kudu-test-utils", configuration: "shadow") + testCompile project(path: ":kudu-subprocess", configuration: "test") + testCompile libs.junit + testCompile libs.log4j + testCompile libs.log4jSlf4jImpl +} +jar { + manifest { + attributes( + 'Main-Class': 'org.apache.kudu.subprocess.echo.EchoSubprocessMain' + ) + } +} diff --git a/java/kudu-subprocess-echo/src/main/java/org/apache/kudu/subprocess/echo/EchoProtocolHandler.java b/java/kudu-subprocess-echo/src/main/java/org/apache/kudu/subprocess/echo/EchoProtocolHandler.java new file mode 100644 index 0000000..a531e80 --- /dev/null +++ b/java/kudu-subprocess-echo/src/main/java/org/apache/kudu/subprocess/echo/EchoProtocolHandler.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.subprocess.echo; + +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.kudu.subprocess.ProtocolHandler; +import org.apache.kudu.subprocess.Subprocess.EchoRequestPB; +import org.apache.kudu.subprocess.Subprocess.EchoResponsePB; + +/** + * Class that processes a EchoRequest and simply echoes the request + * as a response. + */ [email protected] +class EchoProtocolHandler extends ProtocolHandler<EchoRequestPB, EchoResponsePB> { + + @Override + protected EchoResponsePB createResponse(EchoRequestPB request) { + EchoResponsePB.Builder respBuilder = EchoResponsePB.newBuilder(); + respBuilder.setData(request.getData()); + return respBuilder.build(); + } + + @Override + protected Class<EchoRequestPB> getRequestClass() { + return EchoRequestPB.class; + } +} diff --git a/java/settings.gradle b/java/kudu-subprocess-echo/src/main/java/org/apache/kudu/subprocess/echo/EchoSubprocessMain.java similarity index 62% copy from java/settings.gradle copy to java/kudu-subprocess-echo/src/main/java/org/apache/kudu/subprocess/echo/EchoSubprocessMain.java index a89c703..08877b7 100644 --- a/java/settings.gradle +++ b/java/kudu-subprocess-echo/src/main/java/org/apache/kudu/subprocess/echo/EchoSubprocessMain.java @@ -15,18 +15,18 @@ // specific language governing permissions and limitations // under the License. -// This file contains the configuration of the project hierarchy. -// Mainly we just define what subprojects are in the build. +package org.apache.kudu.subprocess.echo; -rootProject.name = "kudu-parent" -include "kudu-backup" -include "kudu-backup-common" -include "kudu-backup-tools" -include "kudu-client" -include "kudu-client-tools" -include "kudu-hive" -include "kudu-jepsen" -include "kudu-mapreduce" -include "kudu-spark" -include "kudu-spark-tools" -include "kudu-test-utils" +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.kudu.subprocess.SubprocessExecutor; + [email protected] +class EchoSubprocessMain { + + public static void main(String[] args) throws Exception { + SubprocessExecutor subprocessExecutor = new SubprocessExecutor(); + EchoProtocolHandler protocolHandler = new EchoProtocolHandler(); + subprocessExecutor.run(args, protocolHandler, /* timeoutMs= */-1); + } +} diff --git a/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java b/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java new file mode 100644 index 0000000..79610de --- /dev/null +++ b/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.subprocess.echo; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.RuleChain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kudu.subprocess.MessageIO; +import org.apache.kudu.subprocess.MessageTestUtil; +import org.apache.kudu.subprocess.KuduSubprocessException; +import org.apache.kudu.subprocess.SubprocessExecutor; +import org.apache.kudu.test.junit.RetryRule; + +/** + * Tests for subprocess that handles EchoRequest messages in various conditions. + */ +public class TestEchoSubprocess { + private static final Logger LOG = LoggerFactory.getLogger(TestEchoSubprocess.class); + private static final Function<Throwable, Object> NO_ERR = e -> { + LOG.error(String.format("Unexpected error: %s", e.getMessage())); + Assert.assertTrue(false); + return null; + }; + private static final Function<Throwable, Object> HAS_ERR = e -> { + Assert.assertTrue(e instanceof KuduSubprocessException); + return null; + }; + + public ExpectedException thrown = ExpectedException.none(); + public RetryRule retryRule = new RetryRule(); + + // ExpectedException misbehaves when combined with other rules; we use a + // RuleChain to beat it into submission. + // + // See https://stackoverflow.com/q/28846088 for more information. + @Rule + public RuleChain chain = RuleChain.outerRule(retryRule).around(thrown); + + + public static class PrintStreamWithIOException extends PrintStream { + public PrintStreamWithIOException(OutputStream out) { + super(out); + } + + @Override + public boolean checkError() { + return true; + } + } + + void runEchoSubprocess(InputStream in, + PrintStream out, + String[] args, + Function<Throwable, Object> errorHandler, + boolean injectInterrupt) + throws InterruptedException, ExecutionException, TimeoutException { + System.setIn(in); + System.setOut(out); + SubprocessExecutor subprocessExecutor = new SubprocessExecutor(errorHandler); + EchoProtocolHandler protocolProcessor = new EchoProtocolHandler(); + if (injectInterrupt) { + subprocessExecutor.interrupt(); + } + subprocessExecutor.run(args, protocolProcessor, /* timeoutMs= */1000); + } + + /** + * Parses non-malformed message should exit normally without any exceptions. + */ + @Test(expected = TimeoutException.class) + public void testBasicMsg() throws Exception { + final String message = "data"; + final byte[] messageBytes = MessageTestUtil.serializeMessage( + MessageTestUtil.createEchoSubprocessRequest(message)); + final InputStream in = new ByteArrayInputStream(messageBytes); + final PrintStream out = new PrintStream(new ByteArrayOutputStream()); + final String[] args = {""}; + runEchoSubprocess(in, out, args, NO_ERR, /* injectInterrupt= */false); + } + + /** + * Parses message with empty payload should exit normally without any exceptions. + */ + @Test(expected = TimeoutException.class) + public void testMsgWithEmptyPayload() throws Exception { + final byte[] emptyPayload = MessageIO.intToBytes(0); + final InputStream in = new ByteArrayInputStream(emptyPayload); + final PrintStream out = new PrintStream(new ByteArrayOutputStream()); + final String[] args = {""}; + runEchoSubprocess(in, out, args, NO_ERR, /* injectInterrupt= */false); + } + + /** + * Parses malformed message should cause <code>IOException</code>. + */ + @Test + public void testMalformedMsg() throws Exception { + final byte[] messageBytes = "malformed".getBytes(StandardCharsets.UTF_8); + final InputStream in = new ByteArrayInputStream(messageBytes); + final PrintStream out = new PrintStream(new ByteArrayOutputStream()); + final String[] args = {""}; + thrown.expect(ExecutionException.class); + thrown.expectMessage("Unable to read the protobuf message"); + runEchoSubprocess(in, out, args, HAS_ERR, /* injectInterrupt= */false); + } + + /** + * Parses message with <code>IOException</code> injected should exit with + * <code>KuduSubprocessException</code>. + */ + @Test + public void testInjectIOException() throws Exception { + final String message = "data"; + final byte[] messageBytes = MessageTestUtil.serializeMessage( + MessageTestUtil.createEchoSubprocessRequest(message)); + final InputStream in = new ByteArrayInputStream(messageBytes); + final PrintStream out = new PrintStreamWithIOException(new ByteArrayOutputStream()); + // Only use one writer task to avoid get TimeoutException instead for + // writer tasks that haven't encountered any exceptions. + final String[] args = {"-w", "1"}; + thrown.expect(ExecutionException.class); + thrown.expectMessage("Unable to write the protobuf message"); + runEchoSubprocess(in, out, args, HAS_ERR, /* injectInterrupt= */false); + } + + /** + * Parses message with <code>InterruptedException</code> injected should exit + * with <code>KuduSubprocessException</code>. + */ + @Test + public void testInjectInterruptedException() throws Exception { + final String message = "data"; + final byte[] messageBytes = MessageTestUtil.serializeMessage( + MessageTestUtil.createEchoSubprocessRequest(message)); + final InputStream in = new ByteArrayInputStream(messageBytes); + final PrintStream out = new PrintStream(new ByteArrayOutputStream()); + final String[] args = {""}; + thrown.expect(ExecutionException.class); + thrown.expectMessage("Unable to put the message to the queue"); + runEchoSubprocess(in, out, args, HAS_ERR, /* injectInterrupt= */true); + } +} diff --git a/java/settings.gradle b/java/kudu-subprocess/build.gradle similarity index 56% copy from java/settings.gradle copy to java/kudu-subprocess/build.gradle index a89c703..76dc248 100644 --- a/java/settings.gradle +++ b/java/kudu-subprocess/build.gradle @@ -15,18 +15,30 @@ // specific language governing permissions and limitations // under the License. -// This file contains the configuration of the project hierarchy. -// Mainly we just define what subprojects are in the build. +apply from: "$rootDir/gradle/protobuf.gradle" +apply from: "$rootDir/gradle/shadow.gradle" -rootProject.name = "kudu-parent" -include "kudu-backup" -include "kudu-backup-common" -include "kudu-backup-tools" -include "kudu-client" -include "kudu-client-tools" -include "kudu-hive" -include "kudu-jepsen" -include "kudu-mapreduce" -include "kudu-spark" -include "kudu-spark-tools" -include "kudu-test-utils" +dependencies { + compile libs.protobufJava + compile libs.protobufJavaUtil + compile libs.hadoopCommon + compile libs.slf4jApi + + optional libs.yetusAnnotations + + testCompile project(path: ":kudu-test-utils", configuration: "shadow") + testCompile libs.junit + testCompile libs.log4j + testCompile libs.log4jSlf4jImpl +} + +// Add protobuf files to the proto source set. +sourceSets { + main { + proto { + srcDir "${project.rootDir}/../src" + // Excluded any test proto files + exclude "**/*test*.proto" + } + } +} diff --git a/java/settings.gradle b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/KuduSubprocessException.java similarity index 60% copy from java/settings.gradle copy to java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/KuduSubprocessException.java index a89c703..34ff107 100644 --- a/java/settings.gradle +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/KuduSubprocessException.java @@ -15,18 +15,24 @@ // specific language governing permissions and limitations // under the License. -// This file contains the configuration of the project hierarchy. -// Mainly we just define what subprojects are in the build. +package org.apache.kudu.subprocess; -rootProject.name = "kudu-parent" -include "kudu-backup" -include "kudu-backup-common" -include "kudu-backup-tools" -include "kudu-client" -include "kudu-client-tools" -include "kudu-hive" -include "kudu-jepsen" -include "kudu-mapreduce" -include "kudu-spark" -include "kudu-spark-tools" -include "kudu-test-utils" +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Any runtime exception(s) thrown by a subprocess. + */ [email protected] +public final class KuduSubprocessException extends RuntimeException { + + /** + * Constructs a new runtime exception with the specified detail + * message and cause. + * + * @param message the detail message + * @param cause the cause + */ + KuduSubprocessException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java new file mode 100644 index 0000000..ea58fac --- /dev/null +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.subprocess; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Bytes; +import com.google.protobuf.Message; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Util class for reading and writing protobuf messages. + */ [email protected] +public class MessageIO { + private final int maxMessageBytes; + private final BufferedInputStream in; + private final BufferedOutputStream out; + + public MessageIO(int maxMessageBytes, + BufferedInputStream in, + BufferedOutputStream out) { + this.maxMessageBytes = maxMessageBytes; + this.in = in; + this.out = out; + } + + /** + * Reads a protobuf message, if any, from the underlying buffered input + * stream. The read is blocking and not atomic (partial reads can occur + * if exceptions occur). As such, users should ensure this is not called + * from multiple threads concurrently. + * + * @return the message in a byte array. + * @throws EOFException if the end of the stream has been reached + * @throws IOException if this input stream has been closed, an I/O + * error occurs, or fail to read the message + * properly + */ + @VisibleForTesting + byte[] readBytes() throws EOFException, IOException { + Preconditions.checkNotNull(in); + // Read four bytes of the message to get the size of the body. + byte[] sizeBytes = new byte[Integer.BYTES]; + doRead(sizeBytes, Integer.BYTES); + int size = bytesToInt(sizeBytes); + if (size > maxMessageBytes) { + throw new IOException( + String.format("message size (%d) exceeds maximum message size (%d)", + size, maxMessageBytes)); + } + // Read the body based on the size. + byte[] dataBytes = new byte[size]; + doRead(dataBytes, size); + return dataBytes; + } + + /** + * Reads <code>size</code> bytes of data from the underlying buffered input + * stream into the specified byte array, starting at the offset <code>0</code>. + * If it fails to read the specified size, <code>IOException</code> is thrown. + * + * @return the message in byte array. + * @throws EOFException if the end of the stream has been reached + * @throws IOException if this input stream has been closed, an I/O + * error occurs, or fail to read the specified size + */ + private void doRead(byte bytes[], int size) throws EOFException, IOException { + Preconditions.checkNotNull(bytes); + int read = in.read(bytes, 0, size); + if (read == -1) { + throw new EOFException("the end of the stream has been reached"); + } else if (read != size) + throw new IOException( + String.format("unable to receive message, expected (%d) bytes " + + "but read (%d) bytes", size, read)); + } + + /** + * Writes a protobuf message to the buffered output stream. Since we flush + * after writing each message, with the underlying buffer size being the + * maximum bytes of a message, the write is atomic. That is if any exceptions + * occur, no partial message will be written to the underlying output stream. + * + * @param message the protobuf message + * @throws IOException if an I/O error occurs + */ + @VisibleForTesting + void writeMessage(Message message) throws IOException { + Preconditions.checkNotNull(out); + byte[] size = intToBytes(message.getSerializedSize()); + byte[] body = message.toByteArray(); + synchronized (out) { + out.write(Bytes.concat(size, body)); + // Always do a flush after write to ensure no partial message is written. + out.flush(); + } + } + + /** + * Converts a four-byte array in big endian order to a 32-bit integer. + * @param data a four-byte array in big endian order + * @return a 32-bit integer + */ + static int bytesToInt(byte[] data) { + return ByteBuffer.wrap(data) + .order(ByteOrder.BIG_ENDIAN) + .getInt(); + } + + /** + * Converts a 32-bit integer to a four bytes array in big endian order. + * @param value a 32-bit integer + * @return a four bytes array in big endian order + */ + @VisibleForTesting + public static byte[] intToBytes(int value) { + return ByteBuffer.allocate(Integer.BYTES) + .order(ByteOrder.BIG_ENDIAN) + .putInt(value) + .array(); + } +} diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java new file mode 100644 index 0000000..d69ab38 --- /dev/null +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.subprocess; + +import java.io.EOFException; +import java.io.IOException; +import java.util.concurrent.BlockingQueue; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@link MessageReader} class, + * 1. processes a message that reads from the underlying input stream. + * 2. and then puts it to the incoming message queue. + * + * Since {@link MessageIO#readBytes()} is not atomic, the implementation + * of MessageReader is not thread-safe, and thus MessageReader should not + * be called concurrently unless handled by the caller. + */ [email protected] +class MessageReader implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(MessageReader.class); + private final BlockingQueue<byte[]> inboundQueue; + private final MessageIO messageIO; + + @VisibleForTesting + private boolean injectInterrupt; + + MessageReader(BlockingQueue<byte[]> inboundQueue, + MessageIO messageIO, + boolean injectInterrupt) { + Preconditions.checkNotNull(inboundQueue); + this.inboundQueue = inboundQueue; + this.messageIO = messageIO; + this.injectInterrupt = injectInterrupt; + } + + @Override + public void run() { + // Inject InterruptedException if needed (for tests only). + if (injectInterrupt) { + Thread.currentThread().interrupt(); + } + while (true) { + // Read the message from the standard input. If fail to read the + // message properly, IOException is thrown. IOException is fatal, + // and should be propagated up the call stack. Retry on IOException + // is not necessary as the error can happen in the middle of message + // reading. + byte[] data; + try { + data = messageIO.readBytes(); + } catch (EOFException e) { + LOG.info("Reaching the end of the input stream, exiting."); + // Break the loop if the end of the stream has been reached. + break; + } catch (IOException e) { + throw new KuduSubprocessException("Unable to read the protobuf message", e); + } + + // Put the message to the queue. If encountered InterruptedException + // during the put, consider it to be fatal (as a signal to shutdown + // the task), and propagate it up the call stack. Log a warning for + // empty message which is not expected. + if (data.length == 0) { + LOG.warn("Empty message received."); + continue; + } + try { + inboundQueue.put(data); + if (LOG.isDebugEnabled()) { + LOG.debug("Message: {} has been put on the queue", data); + } + } catch (InterruptedException e) { + throw new KuduSubprocessException("Unable to put the message to the queue", e); + } + } + } +} diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java new file mode 100644 index 0000000..d5afcee --- /dev/null +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.subprocess; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.BlockingQueue; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kudu.WireProtocol.AppStatusPB; +import org.apache.kudu.subprocess.Subprocess.SubprocessRequestPB; +import org.apache.kudu.subprocess.Subprocess.SubprocessResponsePB; + +/** + * The {@link MessageWriter} class, + * 1. retrieves one message from the queue at a time, + * 2. processes the message and generates a response, + * 3. and then writes the response to the underlying output stream. + */ [email protected] +class MessageWriter implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(MessageWriter.class); + private final BlockingQueue<byte[]> inboundQueue; + private final MessageIO messageIO; + private final ProtocolHandler protocolHandler; + + MessageWriter(BlockingQueue<byte[]> inboundQueue, + MessageIO messageIO, + ProtocolHandler protocolHandler) { + Preconditions.checkNotNull(inboundQueue); + this.inboundQueue = inboundQueue; + this.messageIO = messageIO; + this.protocolHandler = protocolHandler; + } + + @Override + public void run() { + while (true) { + // Take an element from the queue. If encountered InterruptedException, + // consider it to be fatal (as a signal to shutdown the task), and + // propagate it up the call stack. + byte[] data; + try { + data = inboundQueue.take(); + if (LOG.isDebugEnabled()) { + LOG.debug("Message: {} has been taken from the queue", data); + } + } catch (InterruptedException e) { + throw new KuduSubprocessException("Unable to take a message from the queue", e); + } + + SubprocessResponsePB response = getResponse(data); + + // Writes the response to the underlying output stream. IOException is fatal, + // and should be propagated up the call stack. + try { + messageIO.writeMessage(response); + } catch (IOException e) { + throw new KuduSubprocessException("Unable to write the protobuf message", e); + } + } + } + + /** + * Constructs a message with the given error status. + * + * @param errorCode the given error status + * @param resp the message builder + * @return a message with the given error status + */ + static SubprocessResponsePB responseWithError(AppStatusPB.ErrorCode errorCode, + SubprocessResponsePB.Builder resp) { + Preconditions.checkNotNull(resp); + AppStatusPB.Builder errorBuilder = AppStatusPB.newBuilder(); + errorBuilder.setCode(errorCode); + resp.setError(errorBuilder); + return resp.build(); + } + + /** + * Parses the given protobuf message. If encountered InvalidProtocolBufferException, + * which indicates the message is invalid, respond with an error message. + * + * @param data the protobuf message + * @return a SubprocessResponsePB + */ + private SubprocessResponsePB getResponse(byte[] data) { + SubprocessResponsePB response; + SubprocessResponsePB.Builder responseBuilder = SubprocessResponsePB.newBuilder(); + try { + // Parses the data as a message of SubprocessRequestPB type. + SubprocessRequestPB request = SubprocessRequestPB.parser().parseFrom(data); + response = protocolHandler.handleRequest(request); + } catch (InvalidProtocolBufferException e) { + LOG.warn(String.format("%s: %s", "Unable to parse the protobuf message", + new String(data, StandardCharsets.UTF_8)), e); + response = responseWithError(AppStatusPB.ErrorCode.ILLEGAL_STATE, responseBuilder); + } + return response; + } +} diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolHandler.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolHandler.java new file mode 100644 index 0000000..6df91b5 --- /dev/null +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolHandler.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.subprocess; + +import com.google.common.base.Preconditions; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.kudu.subprocess.Subprocess.SubprocessRequestPB; +import org.apache.kudu.subprocess.Subprocess.SubprocessResponsePB; + +/** + * Protocol that represents how to handle a protobuf message. + * + * @param <Request> The request protobuf message + * @param <Response> The response protobuf message + */ [email protected] +public abstract class ProtocolHandler<Request extends Message, + Response extends Message> { + + /** + * Processes the given SubprocessRequestPB message according to the + * request type and returns a SubprocessResponsePB message. + * + * @param request a SubprocessRequestPB protobuf message + * @return a SubprocessResponsePB message + * @throws InvalidProtocolBufferException if the protocol message being parsed is invalid + */ + SubprocessResponsePB handleRequest(SubprocessRequestPB request) + throws InvalidProtocolBufferException { + Preconditions.checkNotNull(request); + SubprocessResponsePB.Builder builder = SubprocessResponsePB.newBuilder(); + builder.setId(request.getId()); + Class<Request> requestType = getRequestClass(); + Response resp = createResponse(request.getRequest().unpack(requestType)); + builder.setResponse(Any.pack(resp)); + return builder.build(); + } + + /** + * Creates a protobuf message that responds to a request message. + * + * @param request the request message + * @return a response + */ + protected abstract Response createResponse(Request request); + + /** + * Gets the class instance of request message. + * + * @return the request class instance + */ + protected abstract Class<Request> getRequestClass(); +} diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java new file mode 100644 index 0000000..bafa3ae --- /dev/null +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.subprocess; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Utility class that manages common configurations to run a subprocess. + */ [email protected] +public class SubprocessConfiguration { + private int queueSize; + private static final int QUEUE_SIZE_DEFAULT = 100; + private int maxWriterThreads; + private static final int MAX_WRITER_THREADS_DEFAULT = 3; + private int maxMsgBytes; + + @VisibleForTesting + static final int MAX_MESSAGE_BYTES_DEFAULT = 1024 * 1024; + + SubprocessConfiguration(String[] args) { + parse(args); + } + + /** + * @return the size of the message queue, or the default value if not + * provided + */ + int getQueueSize() { + return queueSize; + } + + /** + * @return the maximum number of threads in the writer thread pool, or the + * default value if not provided + */ + int getMaxWriterThreads() { + return maxWriterThreads; + } + + /** + * @return the maximum bytes of a message, or the default value if not + * provided + */ + int getMaxMessageBytes() { + return maxMsgBytes; + } + + /** + * Parses the arguments according to the specified options. + * + * @param args the subprocess arguments + * @throws KuduSubprocessException if there are any problems encountered + * while parsing the command line interface. + */ + private void parse(String[] args) throws KuduSubprocessException { + Options options = new Options(); + + final String queueSizeLongOpt = "queueSize"; + Option queueSizeOpt = new Option( + "q", queueSizeLongOpt, /* hasArg= */true, + "Maximum number of messages held by the message queue"); + queueSizeOpt.setRequired(false); + options.addOption(queueSizeOpt); + + final String maxWriterThreadsLongOpt = "maxWriterThreads"; + Option maxThreadsOpt = new Option( + "w", maxWriterThreadsLongOpt, /* hasArg= */true, + "Maximum number of threads in the writer thread pool for subprocess"); + maxThreadsOpt.setRequired(false); + options.addOption(maxThreadsOpt); + + final String maxMsgBytesLongOpt = "maxMsgBytes"; + Option maxMsgOpt = new Option( + "m", maxMsgBytesLongOpt, /* hasArg= */true, + "Maximum bytes of a message for subprocess"); + maxMsgOpt.setRequired(false); + options.addOption(maxMsgOpt); + + CommandLineParser parser = new BasicParser(); + try { + CommandLine cmd = parser.parse(options, args); + String queueSize = cmd.getOptionValue(queueSizeLongOpt); + String maxWriterThreads = cmd.getOptionValue(maxWriterThreadsLongOpt); + String maxMsgBytes = cmd.getOptionValue(maxMsgBytesLongOpt); + this.queueSize = queueSize == null ? + QUEUE_SIZE_DEFAULT : Integer.parseInt(queueSize); + this.maxWriterThreads = maxWriterThreads == null ? + MAX_WRITER_THREADS_DEFAULT : Integer.parseInt(maxWriterThreads); + this.maxMsgBytes = maxMsgBytes == null ? + MAX_MESSAGE_BYTES_DEFAULT : Integer.parseInt(maxMsgBytes); + } catch (ParseException e) { + throw new KuduSubprocessException("Cannot parse the subprocess command line", e); + } + } +} diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java new file mode 100644 index 0000000..e3645a0 --- /dev/null +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.subprocess; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@link SubprocessExecutor} class, + * 1. parses the command line to get the configuration, + * 2. has a single reader thread that continuously reads protobuf-based + * messages from the standard input and puts the messages to a FIFO + * blocking queue, + * 3. has multiple writer threads that continuously retrieve the messages + * from the queue, process them and write the responses to the + * standard output. + */ [email protected] +public class SubprocessExecutor { + private static final Logger LOG = LoggerFactory.getLogger(SubprocessExecutor.class); + private final Function<Throwable, Object> errorHandler; + private boolean injectInterrupt = false; + + public SubprocessExecutor() { + errorHandler = (t) -> { + // Exit the program with a nonzero status code if unexpected exception(s) + // thrown by the reader or writer tasks. + System.exit(1); + return null; + }; + } + + @VisibleForTesting + public SubprocessExecutor(Function<Throwable, Object> errorHandler) { + this.errorHandler = errorHandler; + } + + /** + * Executes the subprocess with the given arguments and protocol processor. + * + * @param args the subprocess arguments + * @param protocolHandler the subprocess protocol handler + * @param timeoutMs the maximum time to wait for subproces tasks to finish, -1 means + * no time out and the tasks will continue execute until it finishes + * @throws ExecutionException if any tasks of the subprocess completed exceptionally + * @throws InterruptedException if the current thread was interrupted while waiting + * @throws TimeoutException if the wait timed out + */ + @VisibleForTesting + public void run(String[] args, ProtocolHandler protocolHandler, long timeoutMs) + throws InterruptedException, ExecutionException, TimeoutException { + SubprocessConfiguration conf = new SubprocessConfiguration(args); + int maxWriterThread = conf.getMaxWriterThreads(); + int queueSize = conf.getQueueSize(); + int maxMessageBytes = conf.getMaxMessageBytes(); + + BlockingQueue<byte[]> inboundQueue = new ArrayBlockingQueue<>(queueSize, /* fair= */true); + ExecutorService readerService = Executors.newSingleThreadExecutor(); + ExecutorService writerService = Executors.newFixedThreadPool(maxWriterThread); + + // Wrap the system output in a SubprocessOutputStream so IOExceptions + // from system output are propagated up instead of being silently swallowed. + // Note that the BufferedOutputStream is initiated with the maximum bytes of + // a message to ensure the underlying buffer can hold the entire message before + // flushing. + try (BufferedInputStream in = new BufferedInputStream(System.in); + BufferedOutputStream out = new BufferedOutputStream( + new SubprocessOutputStream(System.out), maxMessageBytes)) { + MessageIO messageIO = new MessageIO(maxMessageBytes, in, out); + + // Start a single reader thread and run the task asynchronously. + MessageReader reader = new MessageReader(inboundQueue, messageIO, injectInterrupt); + CompletableFuture readerFuture = CompletableFuture.runAsync(reader, readerService); + readerFuture.exceptionally(errorHandler); + + // Start multiple writer threads and run the tasks asynchronously. + MessageWriter writer = new MessageWriter(inboundQueue, messageIO, protocolHandler); + CompletableFuture[] writerFutures = new CompletableFuture[maxWriterThread]; + for (int i = 0; i < maxWriterThread; i++) { + CompletableFuture writerFuture = CompletableFuture.runAsync(writer, writerService); + writerFuture.exceptionally(errorHandler); + writerFutures[i] = writerFuture; + } + + // Wait until the tasks finish execution. -1 means the reader (or writer) tasks + // continue the execution until finish. In cases where we don't want the tasks + // to run forever, e.g. in tests, wait for the specified timeout. + if (timeoutMs == -1) { + readerFuture.join(); + CompletableFuture.allOf(writerFutures).join(); + } else { + readerFuture.get(timeoutMs, TimeUnit.MILLISECONDS); + CompletableFuture.allOf(writerFutures) + .get(timeoutMs, TimeUnit.MILLISECONDS); + } + } catch (IOException e) { + LOG.error("Unable to close the underlying stream", e); + } + } + + /** + * Sets the interruption flag to true. + */ + @VisibleForTesting + public void interrupt() { + injectInterrupt = true; + } +} diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessOutputStream.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessOutputStream.java new file mode 100644 index 0000000..e3ce7e0 --- /dev/null +++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessOutputStream.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.subprocess; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Wrapper around {@link java.io.PrintStream} that throws an + * <code>IOException</code> instead of relying on explicit + * <code>checkError</code> calls when writing or flushing to + * the stream. This makes its error-throwing behavior more + * similar to most {@link java.io.OutputStream}. + */ +public class SubprocessOutputStream extends OutputStream { + private final PrintStream out; + + @VisibleForTesting + public static final String WRITE_ERR = "Unable to write to print stream"; + private static final String FLUSH_ERR = "Unable to flush to print stream"; + + public SubprocessOutputStream(PrintStream out) { + this.out = out; + } + + @Override + public void write(int b) throws IOException { + out.write(b); + if (out.checkError()) { + throw new IOException(WRITE_ERR); + } + } + + @Override + public void write(byte buf[], int off, int len) throws IOException { + out.write(buf, off, len); + if (out.checkError()) { + throw new IOException(WRITE_ERR); + } + } + + @Override + public void write(byte b[]) throws IOException { + out.write(b); + if (out.checkError()) { + throw new IOException(WRITE_ERR); + } + } + + @Override + public void flush() throws IOException { + if (out.checkError()) { + throw new IOException(FLUSH_ERR); + } + } +} diff --git a/java/kudu-subprocess/src/main/resources/log4j2.properties b/java/kudu-subprocess/src/main/resources/log4j2.properties new file mode 100644 index 0000000..b3459ad --- /dev/null +++ b/java/kudu-subprocess/src/main/resources/log4j2.properties @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +status = error +name = PropertiesConfig +appenders = console + +appender.console.type = Console +appender.console.name = STDOUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%p - %t] (%F:%L) %m%n + +rootLogger.level = info +rootLogger.appenderRefs = stdout +rootLogger.appenderRef.stdout.ref = STDOUT diff --git a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTestUtil.java b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTestUtil.java new file mode 100644 index 0000000..932743a --- /dev/null +++ b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTestUtil.java @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.subprocess; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import com.google.protobuf.Parser; + +import org.apache.kudu.subprocess.Subprocess.EchoRequestPB; +import org.apache.kudu.subprocess.Subprocess.SubprocessRequestPB; + +/** + * Utility class of common functions used for testing subprocess + * message processing. + */ +public class MessageTestUtil { + + /** + * Constructs a SubprocessRequestPB message of echo request with the + * given payload. + * + * @param payload the message payload + * @return a SubprocessRequestPB message + */ + public static SubprocessRequestPB createEchoSubprocessRequest(String payload) { + SubprocessRequestPB.Builder builder = SubprocessRequestPB.newBuilder(); + EchoRequestPB.Builder echoBuilder = EchoRequestPB.newBuilder(); + echoBuilder.setData(payload); + builder.setRequest(Any.pack(echoBuilder.build())); + return builder.build(); + } + + /** + * Serializes the given message to a byte array. + * + * @param message the message + * @return a serialized message in byte array + * @throws IOException if an I/O error occurs + */ + public static byte[] serializeMessage(Message message) throws IOException { + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + MessageIO messageIO = new MessageIO( + SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT, + /* in= */null, new BufferedOutputStream(byteOutputStream)); + messageIO.writeMessage(message); + return byteOutputStream.toByteArray(); + } + + /** + * De-serializes the given message in byte array. + * + * @param bytes the serialized message in byte array + * @param parser the parser for the message + * @return a message + * @throws IOException if an I/O error occurs + */ + static <T extends Message> T deserializeMessage(byte[] bytes, Parser<T> parser) + throws IOException { + ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); + MessageIO messageIO = new MessageIO( + SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT, + new BufferedInputStream(inputStream), /* out= */null); + byte[] data = messageIO.readBytes(); + return parser.parseFrom(data); + } +} diff --git a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java new file mode 100644 index 0000000..abedf2b --- /dev/null +++ b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.subprocess; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.Arrays; + +import com.google.common.primitives.Bytes; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.RuleChain; + +import org.apache.kudu.subprocess.Subprocess.SubprocessRequestPB; +import org.apache.kudu.test.junit.RetryRule; + +/** + * Tests for reading and writing protobuf message. + */ +public class TestMessageIO { + + public ExpectedException thrown = ExpectedException.none(); + public RetryRule retryRule = new RetryRule(); + + // ExpectedException misbehaves when combined with other rules; we use a + // RuleChain to beat it into submission. + // + // See https://stackoverflow.com/q/28846088 for more information. + @Rule + public RuleChain chain = RuleChain.outerRule(retryRule).around(thrown); + + public static class PrintStreamOverload extends PrintStream { + public PrintStreamOverload(OutputStream out) { + super(out); + } + + /** + * Expands the visibility of setError() for the tests to call it. + */ + @Override + public void setError() { + super.setError(); + } + } + + /** + * Serializes a subprocess message that wraps EchoRequestPB and de-serializes + * it to verify the content. + */ + @Test + public void testBasicEchoMessage() throws Exception { + final String data = "data"; + final SubprocessRequestPB request = MessageTestUtil.createEchoSubprocessRequest(data); + final byte[] message = MessageTestUtil.serializeMessage(request); + final SubprocessRequestPB actualRequest = MessageTestUtil.deserializeMessage( + message, SubprocessRequestPB.parser()); + Assert.assertEquals(request, actualRequest); + } + + /** + * Verifies that writing messages via <code>SubprocessOutputStream</code> can + * catch errors thrown from underlying <code>PrintStream</code> and re-throws + * <code>IOException</code>. + */ + @Test + public void testSubprocessOutputStream() throws Exception { + final String data = "data"; + final SubprocessRequestPB request = MessageTestUtil.createEchoSubprocessRequest(data); + final PrintStreamOverload printStreamOverload = + new PrintStreamOverload(new ByteArrayOutputStream()); + final BufferedOutputStream out = new BufferedOutputStream( + new SubprocessOutputStream(printStreamOverload)); + final MessageIO messageIO = new MessageIO( + SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT, /* in= */null, out); + thrown.expect(IOException.class); + thrown.expectMessage(SubprocessOutputStream.WRITE_ERR); + printStreamOverload.setError(); + messageIO.writeMessage(request); + } + + /** + * Verifies that reading malformed message that exceeds the maximum + * bytes size should cause expected error. + */ + @Test + public void testMalformedMessageExceedMaxBytes() throws Exception { + byte[] size = MessageIO.intToBytes(SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT + 1); + byte[] body = new byte[0]; + byte[] malformedMessage = Bytes.concat(size, body); + ByteArrayInputStream byteInputStream = new ByteArrayInputStream(malformedMessage); + BufferedInputStream in = new BufferedInputStream(byteInputStream); + MessageIO messageIO = new MessageIO(SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT, + in, /* out= */null); + thrown.expect(IOException.class); + thrown.expectMessage("exceeds maximum message size"); + messageIO.readBytes(); + } + + /** + * Verifies that reading malformed messages that has mismatched size + * and body (not enough data in the body) should cause expected error. + */ + @Test + public void testMalformedMessageMismatchSize() throws Exception { + byte[] size = MessageIO.intToBytes(100); + byte[] body = new byte[10]; + Arrays.fill(body, (byte)0); + byte[] malformedMessage = Bytes.concat(size, body); + BufferedInputStream in = new BufferedInputStream(new ByteArrayInputStream(malformedMessage)); + MessageIO messageIO = new MessageIO(SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT, + in, /* out= */null); + thrown.expect(IOException.class); + thrown.expectMessage("unable to receive message"); + messageIO.readBytes(); + } +} diff --git a/java/settings.gradle b/java/settings.gradle index a89c703..e48c62e 100644 --- a/java/settings.gradle +++ b/java/settings.gradle @@ -29,4 +29,6 @@ include "kudu-jepsen" include "kudu-mapreduce" include "kudu-spark" include "kudu-spark-tools" +include "kudu-subprocess" +include "kudu-subprocess-echo" include "kudu-test-utils"
