Repository: oozie
Updated Branches:
  refs/heads/master 7ef418f7f -> 8013a945f


OOZIE-3382 [SSH action] [SSH action] Optimize process streams draining 
(asalamon74 via andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8013a945
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8013a945
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8013a945

Branch: refs/heads/master
Commit: 8013a945f16b5a10ad1fdd898ea10fcdb838117e
Parents: 7ef418f
Author: Andras Piros <andras.pi...@cloudera.com>
Authored: Sat Dec 1 13:30:12 2018 +0100
Committer: Andras Piros <andras.pi...@cloudera.com>
Committed: Sat Dec 1 13:30:12 2018 +0100

----------------------------------------------------------------------
 .../oozie/action/ssh/SshActionExecutor.java     | 125 +------
 .../org/apache/oozie/util/BufferDrainer.java    | 164 +++++++++
 .../apache/oozie/util/BlockingInputStream.java  | 134 +++++++
 .../util/BlockingWritesExitValueProcess.java    | 132 +++++++
 .../org/apache/oozie/util/DrainerTestCase.java  |  76 ++++
 .../oozie/util/TestBlockingInputStream.java     | 153 ++++++++
 .../TestBlockingWritesExitValueProcess.java     |  65 ++++
 .../apache/oozie/util/TestBufferDrainer.java    | 363 +++++++++++++++++++
 release-log.txt                                 |   1 +
 9 files changed, 1106 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
index 3e0e3c5..1e37e80 100644
--- a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
@@ -41,6 +41,7 @@ import org.apache.oozie.service.CallbackService;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.servlet.CallbackServlet;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.util.BufferDrainer;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.PropertiesUtils;
 import org.apache.oozie.util.XLog;
@@ -146,11 +147,11 @@ public class SshActionExecutor extends ActionExecutor {
                 LOG.debug("Ssh command [{0}]", dataCommand);
                 try {
                     final Process process = 
Runtime.getRuntime().exec(dataCommand.split("\\s"));
-
-                    final StringBuffer outBuffer = new StringBuffer();
-                    final StringBuffer errBuffer = new StringBuffer();
+                    final BufferDrainer bufferDrainer = new 
BufferDrainer(process, maxLen);
+                    bufferDrainer.drainBuffers();
+                    final StringBuffer outBuffer = 
bufferDrainer.getInputBuffer();
+                    final StringBuffer errBuffer = 
bufferDrainer.getErrorBuffer();
                     boolean overflow = false;
-                    drainBuffers(process, outBuffer, errBuffer, maxLen);
                     LOG.trace("outBuffer={0}", outBuffer);
                     LOG.trace("errBuffer={0}", errBuffer);
                     if (outBuffer.length() > maxLen) {
@@ -306,11 +307,11 @@ public class SshActionExecutor extends ActionExecutor {
         String outFile = getRemoteFileName(context, action, "pid", false, 
false);
         String getOutputCmd = SSH_COMMAND_BASE + host + " cat " + outFile;
         try {
-            Process process = 
Runtime.getRuntime().exec(getOutputCmd.split("\\s"));
-            StringBuffer buffer = new StringBuffer();
-            drainBuffers(process, buffer, null, maxLen);
+            final Process process = 
Runtime.getRuntime().exec(getOutputCmd.split("\\s"));
+            final BufferDrainer bufferDrainer = new BufferDrainer(process, 
maxLen);
+            bufferDrainer.drainBuffers();
+            final StringBuffer buffer = bufferDrainer.getInputBuffer();
             String pid = getFirstLine(buffer);
-
             if (Long.valueOf(pid) > 0) {
                 return pid;
             }
@@ -358,8 +359,9 @@ public class SshActionExecutor extends ActionExecutor {
         Runtime runtime = Runtime.getRuntime();
         Process p = runtime.exec(command.split("\\s"));
 
-        StringBuffer errorBuffer = new StringBuffer();
-        int exitValue = drainBuffers(p, null, errorBuffer, maxLen);
+        final BufferDrainer bufferDrainer = new BufferDrainer(p, maxLen);
+        final int exitValue = bufferDrainer.drainBuffers();
+        final StringBuffer errorBuffer = bufferDrainer.getErrorBuffer();
 
         if (exitValue != 0) {
             String error = getTruncatedString(errorBuffer);
@@ -447,12 +449,11 @@ public class SshActionExecutor extends ActionExecutor {
         LOG.trace("Executing SSH command [finalCommand={0}]", 
Arrays.toString(finalCommand));
         final Process p = runtime.exec(finalCommand);
 
-        final StringBuffer inputBuffer = new StringBuffer();
-        final StringBuffer errorBuffer = new StringBuffer();
-        final int exitValue = drainBuffers(p, inputBuffer, errorBuffer, 
maxLen);
-
+        BufferDrainer bufferDrainer = new BufferDrainer(p, maxLen);
+        final int exitValue = bufferDrainer.drainBuffers();
+        final StringBuffer inputBuffer = bufferDrainer.getInputBuffer();
+        final StringBuffer errorBuffer = bufferDrainer.getErrorBuffer();
         final String pid = getFirstLine(inputBuffer);
-
         if (exitValue != 0) {
             String error = getTruncatedString(errorBuffer);
             throw new IOException(XLog.format("Not able to execute ssh-base.sh 
on {0}", host) + " | " + "ErrorStream: "
@@ -504,7 +505,8 @@ public class SshActionExecutor extends ActionExecutor {
         Process ps = null;
         try {
             ps = Runtime.getRuntime().exec(command.split("\\s"));
-            returnValue = drainBuffers(ps, null, null, 0);
+            final BufferDrainer bufferDrainer = new BufferDrainer(ps, 0);
+            returnValue = bufferDrainer.drainBuffers();
         }
         catch (IOException e) {
             throw new 
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, 
"FAILED_OPERATION", XLog.format(
@@ -729,97 +731,6 @@ public class SshActionExecutor extends ActionExecutor {
     }
 
     /**
-     * Drains the inputStream and errorStream of the Process being executed. 
The contents of the streams are stored if a
-     * buffer is provided for the stream.
-     *
-     * @param p The Process instance.
-     * @param inputBuffer The buffer into which STDOUT is to be read. Can be 
null if only draining is required.
-     * @param errorBuffer The buffer into which STDERR is to be read. Can be 
null if only draining is required.
-     * @param maxLength The maximum data length to be stored in these buffers. 
This is an indicative value, and the
-     * store content may exceed this length.
-     * @return the exit value of the processSettings.
-     * @throws IOException
-     */
-    private int drainBuffers(final Process p, final StringBuffer inputBuffer, 
final StringBuffer errorBuffer, final int maxLength)
-            throws IOException {
-        LOG.trace("drainBuffers() start");
-
-        int exitValue = -1;
-
-        int inBytesRead = 0;
-        int errBytesRead = 0;
-
-        boolean processEnded = false;
-
-        try (final BufferedReader ir = new BufferedReader(new 
InputStreamReader(p.getInputStream(), Charsets.UTF_8));
-             final BufferedReader er = new BufferedReader(new 
InputStreamReader(p.getErrorStream(), Charsets.UTF_8))) {
-            // Here we do some kind of busy waiting, checking whether the 
process has finished by calling Process#exitValue().
-            // If not yet finished, an IllegalThreadStateException is thrown 
and ignored, the progress on stdout and stderr read,
-            // and retried until the process has ended.
-            // Note that Process#waitFor() may block sometimes, that's why we 
do a polling mechanism using Process#exitValue()
-            // instead. Until we extend unit and integration test coverage for 
SSH action, and we can introduce a more sophisticated
-            // error handling based on the extended coverage, this solution 
should stay in place.
-            while (!processEnded) {
-                try {
-                    // Doesn't block but throws IllegalThreadStateException if 
the process hasn't finished yet
-                    exitValue = p.exitValue();
-                    processEnded = true;
-                }
-                catch (final IllegalThreadStateException itse) {
-                    // Continue to drain
-                }
-
-                // Drain input and error streams
-                inBytesRead += drainBuffer(ir, inputBuffer, maxLength, 
inBytesRead, processEnded);
-                errBytesRead += drainBuffer(er, errorBuffer, maxLength, 
errBytesRead, processEnded);
-
-                // Necessary evil: sleep and retry
-                if (!processEnded) {
-                    try {
-                        Thread.sleep(500);
-                    }
-                    catch (final InterruptedException ie) {
-                        // Sleep a little, then check again
-                    }
-                }
-            }
-        }
-
-        LOG.trace("drainBuffers() end [exitValue={0}]", exitValue);
-
-        return exitValue;
-    }
-
-    /**
-     * Reads the contents of a stream and stores them into the provided buffer.
-     *
-     * @param br The stream to be read.
-     * @param storageBuf The buffer into which the contents of the stream are 
to be stored.
-     * @param maxLength The maximum number of bytes to be stored in the 
buffer. An indicative value and may be
-     * exceeded.
-     * @param bytesRead The number of bytes read from this stream to date.
-     * @param readAll If true, the stream is drained while their is data 
available in it. Otherwise, only a single chunk
-     * of data is read, irrespective of how much is available.
-     * @return bReadSession returns drainBuffer for stream of contents
-     * @throws IOException
-     */
-    private int drainBuffer(BufferedReader br, StringBuffer storageBuf, int 
maxLength, int bytesRead, boolean readAll)
-            throws IOException {
-        int bReadSession = 0;
-        if (br.ready()) {
-            char[] buf = new char[1024];
-            do {
-                int bReadCurrent = br.read(buf, 0, 1024);
-                if (storageBuf != null && bytesRead < maxLength) {
-                    storageBuf.append(buf, 0, bReadCurrent);
-                }
-                bReadSession += bReadCurrent;
-            } while (br.ready() && readAll);
-        }
-        return bReadSession;
-    }
-
-    /**
      * Returns the first line from a StringBuffer, recognized by the new line 
character \n.
      *
      * @param buffer The StringBuffer from which the first line is required.

http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/main/java/org/apache/oozie/util/BufferDrainer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/BufferDrainer.java 
b/core/src/main/java/org/apache/oozie/util/BufferDrainer.java
new file mode 100644
index 0000000..304fd6d
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/BufferDrainer.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+public class BufferDrainer {
+
+    private final XLog LOG = XLog.getLog(getClass());
+    private static final int DRAIN_BUFFER_SLEEP_TIME_MS = 500;
+    private final Process process;
+    private final int maxLength;
+    private boolean drainBuffersFinished;
+    private final StringBuffer inputBuffer;
+    private final StringBuffer errorBuffer;
+
+    /**
+     * @param process The Process instance.
+     * @param maxLength The maximum data length to be stored in these buffers. 
This is an indicative value, and the
+     * store content may exceed this length.
+     */
+    public BufferDrainer(Process process, int maxLength) {
+        this.process = process;
+        this.maxLength = maxLength;
+        drainBuffersFinished = false;
+        inputBuffer = new StringBuffer();
+        errorBuffer = new StringBuffer();
+    }
+
+    /**
+     * Drains the inputStream and errorStream of the Process being executed. 
The contents of the streams are stored if a
+     * buffer is provided for the stream.
+     *
+     * @return the exit value of the processSettings.
+     * @throws IOException
+     */
+    public int drainBuffers() throws IOException {
+        if (drainBuffersFinished) {
+            throw new IllegalStateException("Buffer draining has already been 
finished");
+        }
+        LOG.trace("drainBuffers() start");
+
+        int exitValue = -1;
+
+        int inBytesRead = 0;
+        int errBytesRead = 0;
+
+        boolean processEnded = false;
+
+        try (final BufferedReader ir = new BufferedReader(new 
InputStreamReader(process.getInputStream(), Charsets.UTF_8));
+             final BufferedReader er = new BufferedReader(new 
InputStreamReader(process.getErrorStream(), Charsets.UTF_8))) {
+            // Here we do some kind of busy waiting, checking whether the 
process has finished by calling Process#exitValue().
+            // If not yet finished, an IllegalThreadStateException is thrown 
and ignored, the progress on stdout and stderr read,
+            // and retried until the process has ended.
+            // Note that Process#waitFor() may block sometimes, that's why we 
do a polling mechanism using Process#exitValue()
+            // instead. Until we extend unit and integration test coverage for 
SSH action, and we can introduce a more
+            // sophisticated error handling based on the extended coverage, 
this solution should stay in place.
+            while (!processEnded) {
+                try {
+                    // Doesn't block but throws IllegalThreadStateException if 
the process hasn't finished yet
+                    exitValue = process.exitValue();
+                    processEnded = true;
+                }
+                catch (final IllegalThreadStateException itse) {
+                    // Continue to drain
+                }
+
+                // Drain input and error streams
+                inBytesRead += drainBuffer(ir, inputBuffer, maxLength, 
inBytesRead, processEnded);
+                errBytesRead += drainBuffer(er, errorBuffer, maxLength, 
errBytesRead, processEnded);
+
+                // Necessary evil: sleep and retry
+                if (!processEnded) {
+                    try {
+                        LOG.trace("Sleeping {}ms during buffer draining", 
DRAIN_BUFFER_SLEEP_TIME_MS);
+                        Thread.sleep(DRAIN_BUFFER_SLEEP_TIME_MS);
+                    }
+                    catch (final InterruptedException ie) {
+                        // Sleep a little, then check again
+                    }
+                }
+            }
+        }
+
+        LOG.trace("drainBuffers() end [exitValue={0}]", exitValue);
+        drainBuffersFinished = true;
+        return exitValue;
+    }
+
+    public StringBuffer getInputBuffer() {
+        if (drainBuffersFinished) {
+            return inputBuffer;
+        }
+        else {
+            throw new IllegalStateException("Buffer draining has not been 
finished yet");
+        }
+    }
+
+    public StringBuffer getErrorBuffer() {
+        if (drainBuffersFinished) {
+            return errorBuffer;
+        }
+        else {
+            throw new IllegalStateException("Buffer draining has not been 
finished yet");
+        }
+    }
+
+    /**
+     * Reads the contents of a stream and stores them into the provided buffer.
+     *
+     * @param br The stream to be read.
+     * @param storageBuf The buffer into which the contents of the stream are 
to be stored.
+     * @param maxLength The maximum number of bytes to be stored in the 
buffer. An indicative value and may be
+     * exceeded.
+     * @param bytesRead The number of bytes read from this stream to date.
+     * @param readAll If true, the stream is drained while their is data 
available in it. Otherwise, only a single chunk
+     * of data is read, irrespective of how much is available.
+     * @return bReadSession returns drainBuffer for stream of contents
+     * @throws IOException
+     */
+    @VisibleForTesting
+    static int drainBuffer(BufferedReader br, StringBuffer storageBuf, int 
maxLength, int bytesRead, boolean readAll)
+            throws IOException {
+        int bReadSession = 0;
+        int chunkSize = 1024;
+        if (br.ready()) {
+            char[] buf = new char[1024];
+            int bReadCurrent;
+            boolean wantsToReadFurther;
+            do {
+                bReadCurrent = br.read(buf, 0, chunkSize);
+                if (storageBuf != null && bytesRead < maxLength && 
bReadCurrent != -1) {
+                    storageBuf.append(buf, 0, bReadCurrent);
+                }
+                if (bReadCurrent != -1) {
+                    bReadSession += bReadCurrent;
+                }
+                wantsToReadFurther = bReadCurrent != -1 && (readAll || 
bReadCurrent == chunkSize);
+            } while (br.ready() && wantsToReadFurther);
+        }
+        return bReadSession;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/test/java/org/apache/oozie/util/BlockingInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/BlockingInputStream.java 
b/core/src/test/java/org/apache/oozie/util/BlockingInputStream.java
new file mode 100644
index 0000000..3472bc6
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/util/BlockingInputStream.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.ByteArrayInputStream;
+import java.util.Random;
+
+/**
+ * A {@code ByteArrayInputStream} where the full data is not available at the 
beginning but produced on the fly. Simulates
+ * an external process generating the data.
+ *
+ * <p>
+ * {@code simulateSlowWriting} simulates when the process writes data in 
chunks instead of writing all the data
+ * at the beginning.
+ * <p>
+ * It is possible to specify the pause times before writing the next chunk.
+ * <p>
+ * It is possible to specify {@code catBufferSize}, if the buffer contains 
more unread data, the process reports
+ * itself as running.
+ *
+ */
+public class BlockingInputStream extends ByteArrayInputStream {
+    private static final int CHUNK_WRITE_DELAY_MS = 500;
+    private static final double CHUNK_WRITE_MIN_RATIO = 0.25;
+    private int readPosition;
+    private int writePosition;
+    private boolean simulateSlowWriting;
+    private int bufferSize;
+    private long lastTimeDataWritten;
+    private int pauseIndex;
+    private int[] pauseTimes = {CHUNK_WRITE_DELAY_MS};
+    private boolean failed;
+
+    BlockingInputStream(byte buf[], boolean simulateSlowWriting) {
+        super(buf);
+        this.bufferSize = buf.length;
+        this.simulateSlowWriting = simulateSlowWriting;
+        writeFirstChunk(buf, simulateSlowWriting);
+    }
+
+    void setBufferSize(int bufferSize) {
+        if (!simulateSlowWriting) {
+            throw new IllegalArgumentException("Cannot specify bufferSize");
+        }
+        this.bufferSize = bufferSize;
+    }
+
+    void setPauseTimes(int[] pauseTimes) {
+        this.pauseTimes = pauseTimes;
+    }
+
+    private void writeFirstChunk(byte[] buf, boolean simulateSlowWriting) {
+        if (simulateSlowWriting) {
+            writeNextChunk(0);
+        }
+        else {
+            writeNextChunk(buf.length);
+        }
+    }
+
+    @Override
+    public synchronized int read(byte b[], int off, int len) {
+        final int bytesReadyToRead = writePosition - readPosition;
+        final int bytesToRead = Math.min(len, bytesReadyToRead);
+        final int readBytes = super.read(b, off, bytesToRead);
+        final boolean someBytesRead = readBytes != -1;
+        if (someBytesRead) {
+            readPosition += readBytes;
+        }
+        return readBytes;
+    }
+
+    @Override
+    public synchronized int available() {
+        return writePosition - readPosition;
+    }
+
+    @VisibleForTesting
+    boolean checkBlockedAndTryWriteNextChunk() {
+        boolean fullBufferReady = writePosition == buf.length;
+        boolean readFullBuffer = readPosition == buf.length;
+        boolean thereAreNoBytesInBuffer = writePosition - readPosition == 0;
+        boolean isBlocked = failed || (!readFullBuffer && (!fullBufferReady || 
thereAreNoBytesInBuffer ));
+        tryWriteNextChunk();
+        return isBlocked;
+    }
+
+    private boolean readyToWriteNextChunk() {
+        return System.currentTimeMillis() - lastTimeDataWritten > 
getPauseTimeMs(pauseIndex);
+    }
+
+    private int getPauseTimeMs(int index) {
+        return pauseTimes[index % pauseTimes.length];
+    }
+
+    private void tryWriteNextChunk() {
+        if (!failed && writePosition < buf.length && available() < bufferSize 
&& readyToWriteNextChunk()) {
+            int maxSize = Math.min(bufferSize - available(), buf.length - 
writePosition);
+            int minSize = Math.min((int)(CHUNK_WRITE_MIN_RATIO * buf.length 
+1), maxSize);
+            int nextChunkSize = new Random().nextInt((maxSize - minSize) + 1) 
+ minSize;
+            writeNextChunk(nextChunkSize);
+        }
+    }
+
+    private void writeNextChunk(int chunkSize) {
+        writePosition += chunkSize;
+        lastTimeDataWritten = System.currentTimeMillis();
+        if (chunkSize > 0) {
+            ++pauseIndex;
+        }
+    }
+
+    void simulateFailure() {
+        failed = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/test/java/org/apache/oozie/util/BlockingWritesExitValueProcess.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/util/BlockingWritesExitValueProcess.java 
b/core/src/test/java/org/apache/oozie/util/BlockingWritesExitValueProcess.java
new file mode 100644
index 0000000..116cab3
--- /dev/null
+++ 
b/core/src/test/java/org/apache/oozie/util/BlockingWritesExitValueProcess.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Simulates different processes writing to standard output / error.
+ * <p>
+ * {code exitValue()} throws {@code IllegalThreadStateException} while the 
buffers are blocked.
+ * <p>
+ * Uses {@code BlockInputStream}
+ */
+public class BlockingWritesExitValueProcess extends Process {
+    static final int EXIT_VALUE = 1;
+
+    private BlockingInputStream inputStream;
+    private BlockingInputStream errorStream;
+    private String outputString;
+    private String errorString;
+    private int bufferSize = Integer.MAX_VALUE;
+    private boolean simulateSlowWriting;
+    private int[] outputPauseTimes;
+    private int[] errorPauseTimes;
+
+    private BlockingWritesExitValueProcess(String outputString, String 
errorString) {
+        this.outputString = outputString;
+        this.errorString = errorString;
+    }
+
+    static BlockingWritesExitValueProcess createFastWritingProcess(String 
outputString, String errorString) {
+        BlockingWritesExitValueProcess process = new 
BlockingWritesExitValueProcess(outputString, errorString);
+        process.generateStreams();
+        return process;
+    }
+
+    static BlockingWritesExitValueProcess createBufferLimitedProcess(String 
outputString, String errorString, int bufferSize) {
+        BlockingWritesExitValueProcess process = new 
BlockingWritesExitValueProcess(outputString, errorString);
+        process.simulateSlowWriting = true;
+        process.bufferSize = bufferSize;
+        process.generateStreams();
+        return process;
+    }
+
+    static BlockingWritesExitValueProcess createPausedProcess(String 
outputString, String errorString, int[] pauseTimes) {
+        BlockingWritesExitValueProcess process = new 
BlockingWritesExitValueProcess(outputString, errorString);
+        process.simulateSlowWriting = true;
+        process.outputPauseTimes = pauseTimes;
+        process.errorPauseTimes = pauseTimes;
+        process.generateStreams();
+        return process;
+    }
+
+    private void generateStreams() {
+        byte []outputByteArray = outputString.getBytes(StandardCharsets.UTF_8);
+        byte []errorByteArray = errorString.getBytes(StandardCharsets.UTF_8);
+        inputStream = new BlockingInputStream(outputByteArray, 
simulateSlowWriting);
+        if (simulateSlowWriting) {
+            inputStream.setBufferSize(bufferSize);
+        }
+        if (outputPauseTimes != null) {
+            inputStream.setPauseTimes(outputPauseTimes);
+        }
+        errorStream = new BlockingInputStream(errorByteArray, 
simulateSlowWriting);
+        if (simulateSlowWriting) {
+            errorStream.setBufferSize(bufferSize);
+        }
+        if (errorPauseTimes != null) {
+            errorStream.setPauseTimes(errorPauseTimes);
+        }
+    }
+
+
+    @Override
+    public OutputStream getOutputStream() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public InputStream getInputStream() {
+        return inputStream;
+    }
+
+    @Override
+    public InputStream getErrorStream() {
+        return errorStream;
+    }
+
+    @Override
+    public int waitFor() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int exitValue() {
+        boolean inputStreamBlocked = 
inputStream.checkBlockedAndTryWriteNextChunk();
+        boolean errorStreamBlocked = 
errorStream.checkBlockedAndTryWriteNextChunk();
+        if (inputStreamBlocked || errorStreamBlocked) {
+            throw new IllegalThreadStateException("Process is still running");
+        }
+        return EXIT_VALUE;
+    }
+
+    @Override
+    public void destroy() {
+        throw new UnsupportedOperationException();
+    }
+
+    void simulateFailure() {
+        inputStream.simulateFailure();
+        errorStream.simulateFailure();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/test/java/org/apache/oozie/util/DrainerTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/DrainerTestCase.java 
b/core/src/test/java/org/apache/oozie/util/DrainerTestCase.java
new file mode 100644
index 0000000..2221e61
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/util/DrainerTestCase.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import org.junit.BeforeClass;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DrainerTestCase {
+
+    private static final String HEX_CHARACTERS = "0123456789ABCDEF";
+    private static final List<String> sampleStrings = new ArrayList<>();
+    private static final int[] suggestedMaxLengthsToCheck = {1, 1024, 
1024*1024};
+    private static final int[] stringLengthsToCheck = {0, 16, 4096, 1024*1024, 
20*1024*1024};
+
+    @BeforeClass
+    public static void setUpBeforeClass() {
+        if (sampleStrings.isEmpty()) {
+            generateSamples();
+        }
+    }
+
+    private static void generateSamples() {
+        for (int length : stringLengthsToCheck) {
+            sampleStrings.add(generateString(length));
+        }
+    }
+
+    static String generateString(int length) {
+        StringBuffer sb = new StringBuffer();
+        for (int i=0; i<length/HEX_CHARACTERS.length(); ++i) {
+            sb.append(HEX_CHARACTERS);
+        }
+        sb.append(HEX_CHARACTERS.substring(0,length % 
HEX_CHARACTERS.length()));
+        return sb.toString();
+    }
+
+    static abstract class StringAndIntProcessingCallback {
+        public abstract void call(String sampleString, int suggestedMaxLength) 
throws Exception;
+    }
+
+    static abstract class StringProcessingCallback {
+        public abstract void call(String sampleString) throws Exception;
+    }
+
+    void 
checkSampleStringWithDifferentMaxLength(StringAndIntProcessingCallback 
callback) throws Exception {
+        for (String sampleString : sampleStrings) {
+            for (int length : suggestedMaxLengthsToCheck) {
+                callback.call(sampleString, length);
+            }
+        }
+    }
+
+    void checkSampleStrings(StringProcessingCallback callback) throws 
Exception {
+        for (String sampleString : sampleStrings) {
+            callback.call(sampleString);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/test/java/org/apache/oozie/util/TestBlockingInputStream.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/util/TestBlockingInputStream.java 
b/core/src/test/java/org/apache/oozie/util/TestBlockingInputStream.java
new file mode 100644
index 0000000..f365f17
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/util/TestBlockingInputStream.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestBlockingInputStream extends DrainerTestCase {
+    @Test
+    public void testFastWritingBlockingInputStream() throws Exception {
+        checkSampleStrings( new StringProcessingCallback() {
+            public void call(String sampleString) {
+                readFastBlockingInputStreamAndAssert(sampleString);
+            }
+        });
+    }
+
+    private void readFastBlockingInputStreamAndAssert(String sampleString) {
+        byte []sampleByteArray = sampleString.getBytes(StandardCharsets.UTF_8);
+        boolean simulateSlowWriting = false;
+        BlockingInputStream sampleStream = new 
BlockingInputStream(sampleByteArray, simulateSlowWriting);
+        readTillStreamEndAndAssert(sampleByteArray, sampleStream, true);
+    }
+
+    @Test
+    public void testSlowWritingBlockingInputStream() throws Exception {
+        checkSampleStrings( new StringProcessingCallback() {
+            public void call(String sampleString) {
+                readSlowBlockingInputStreamAndAssert(sampleString);
+            }
+        });
+    }
+
+    private void readSlowBlockingInputStreamAndAssert(String sampleString) {
+        byte []sampleByteArray = sampleString.getBytes(StandardCharsets.UTF_8);
+        boolean simulateSlowWriting = true;
+        BlockingInputStream sampleStream = new 
BlockingInputStream(sampleByteArray, simulateSlowWriting);
+        assertEquals("No characters should be available to read", 0, 
sampleStream.available());
+        assertTrue("Stream should be in blocked state",sampleString.isEmpty() 
|| sampleStream.checkBlockedAndTryWriteNextChunk());
+        waitAndWriteNextChunks(sampleStream, 600);
+        readTillStreamEndAndAssert(sampleByteArray, sampleStream, true);
+    }
+
+    private void waitAndWriteNextChunks(BlockingInputStream sampleStream, int 
sleepTimeMs) {
+        // at most 4 steps should be enough to produce the full content or 
reach the buffer limit
+        for (int i = 0; i < 4; ++i) {
+            waitAndWriteNextChunk(sampleStream, sleepTimeMs);
+        }
+    }
+
+    private void waitAndWriteNextChunk(BlockingInputStream sampleStream, int 
sleepTimeMs) {
+        try {
+            Thread.sleep(sleepTimeMs);
+        } catch (final InterruptedException ignoredException) {
+        }
+        sampleStream.checkBlockedAndTryWriteNextChunk();
+    }
+
+    @Test
+    public void testLimitedWritingBlockingInputStream() throws Exception {
+        checkSampleStrings( new StringProcessingCallback() {
+            public void call(String sampleString) {
+                readLimitedBlockingInputStreamAndAssert(sampleString);
+            }
+        });
+    }
+
+    private void readLimitedBlockingInputStreamAndAssert(String sampleString) {
+        byte []sampleByteArray = sampleString.getBytes(StandardCharsets.UTF_8);
+        boolean simulateSlowWriting = true;
+        BlockingInputStream sampleStream = new 
BlockingInputStream(sampleByteArray, simulateSlowWriting);
+        int bufferSize = sampleString.length() / 2;
+        int []smallPauseTimes = {10};
+        sampleStream.setPauseTimes(smallPauseTimes);
+        sampleStream.setBufferSize(bufferSize);
+        assertEquals("No characters should be available to read", 0, 
sampleStream.available());
+        assertTrue("Stream should be in blocked state",sampleString.isEmpty() 
|| sampleStream.checkBlockedAndTryWriteNextChunk());
+        waitAndWriteNextChunks(sampleStream, 20);
+        // buffersize should block the stream
+        assertTrue("Stream should be in blocked state",sampleString.isEmpty() 
|| sampleStream.checkBlockedAndTryWriteNextChunk());
+        assertEquals("Invalid number of characters available", bufferSize, 
sampleStream.available());
+        readTillStreamEndAndAssert(sampleByteArray, sampleStream, false);
+    }
+
+    private void readTillStreamEndAndAssert(byte[] sampleByteArray, 
BlockingInputStream sampleStream,
+                                            boolean 
assertingAlreadyNonBlocked) {
+        int availableCharacters = sampleStream.available();
+        if (assertingAlreadyNonBlocked) {
+            assertFalse("Stream should not be in blocked state", 
sampleStream.checkBlockedAndTryWriteNextChunk());
+            assertEquals("Invalid number of characters available", 
sampleByteArray.length, availableCharacters);
+        }
+        byte[] outputByteArray = new byte[sampleByteArray.length];
+        int writeIndex=0;
+        while (availableCharacters > 0) {
+            int bytesRead = sampleStream.read(outputByteArray, writeIndex, 
availableCharacters);
+            assertEquals("Invalid number of characters read", 
availableCharacters, bytesRead);
+            writeIndex += bytesRead;
+            waitAndWriteNextChunk(sampleStream, 20);
+            availableCharacters = sampleStream.available();
+        }
+        assertTrue("Content read mismatch", Arrays.equals(sampleByteArray, 
outputByteArray));
+    }
+
+    @Test
+    public void testFailure() throws Exception {
+        checkSampleStrings( new StringProcessingCallback() {
+            public void call(String sampleString) {
+                simulateFailureAndAssert(sampleString);
+            }
+        });
+    }
+
+    private void simulateFailureAndAssert(String sampleString) {
+        byte []sampleByteArray = sampleString.getBytes(StandardCharsets.UTF_8);
+        boolean simulateSlowWriting = false;
+        BlockingInputStream sampleStream = new 
BlockingInputStream(sampleByteArray, simulateSlowWriting);
+        int availableCharacters = sampleStream.available();
+        assertFalse("Stream should not be in blocked state", 
sampleStream.checkBlockedAndTryWriteNextChunk());
+        assertEquals("Invalid number of characters available", 
sampleByteArray.length, availableCharacters);
+        if (availableCharacters > 0) {
+            byte []outputByteArray = new byte[1];
+            int bytesRead = sampleStream.read(outputByteArray, 0, 1);
+            assertEquals("Invalid number of characters read", 1, bytesRead);
+        }
+        sampleStream.simulateFailure();
+        assertTrue("Stream should be in blocked state", 
sampleStream.checkBlockedAndTryWriteNextChunk());
+        int availableCharactersAfterFailure = sampleStream.available();
+        assertEquals("Invalid numbe of characters available after failure",
+                Math.max(availableCharacters-1, 0), 
availableCharactersAfterFailure);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/test/java/org/apache/oozie/util/TestBlockingWritesExitValueProcess.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/util/TestBlockingWritesExitValueProcess.java
 
b/core/src/test/java/org/apache/oozie/util/TestBlockingWritesExitValueProcess.java
new file mode 100644
index 0000000..fa0fa78
--- /dev/null
+++ 
b/core/src/test/java/org/apache/oozie/util/TestBlockingWritesExitValueProcess.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.util;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBlockingWritesExitValueProcess extends DrainerTestCase {
+    private static final String TEST_STRING = "test string";
+    @Rule
+    public final ExpectedException expectedException = 
ExpectedException.none();
+
+    @Test
+    public void testNonBlockingProcess() {
+        final BlockingWritesExitValueProcess process = 
BlockingWritesExitValueProcess.createFastWritingProcess("", "");
+        assertEquals("Invalid exit value", 
BlockingWritesExitValueProcess.EXIT_VALUE, process.exitValue());
+        assertTrue("Invalid input stream class", process.getInputStream() 
instanceof BlockingInputStream);
+        assertTrue("Invalid error stream class", process.getErrorStream() 
instanceof BlockingInputStream);
+    }
+
+    @Test
+    public void testBlockingInputStreamProcess() {
+        final BlockingWritesExitValueProcess process = 
BlockingWritesExitValueProcess.createBufferLimitedProcess(TEST_STRING,
+                "", 1);
+        expectedException.expect(IllegalThreadStateException.class);
+        process.exitValue();
+    }
+
+    @Test
+    public void testBlockingErrorStreamProcess() {
+        final BlockingWritesExitValueProcess process = 
BlockingWritesExitValueProcess.createBufferLimitedProcess("",
+                TEST_STRING, 1);
+        expectedException.expect(IllegalThreadStateException.class);
+        process.exitValue();
+    }
+
+    @Test
+    public void testFailure() {
+        final BlockingWritesExitValueProcess process = 
BlockingWritesExitValueProcess.createFastWritingProcess("", "");
+        assertEquals("Invalid exit value", 
BlockingWritesExitValueProcess.EXIT_VALUE, process.exitValue());
+        process.simulateFailure();
+        expectedException.expect(IllegalThreadStateException.class);
+        process.exitValue();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/test/java/org/apache/oozie/util/TestBufferDrainer.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestBufferDrainer.java 
b/core/src/test/java/org/apache/oozie/util/TestBufferDrainer.java
new file mode 100644
index 0000000..1c30943
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/util/TestBufferDrainer.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestBufferDrainer extends DrainerTestCase {
+    @Rule
+    public final ExpectedException expectedException = 
ExpectedException.none();
+
+    private AssertionError assertionError;
+    private IOException ioException;
+
+    @Test
+    public void testTooEarlyInputStreamRead() {
+        final BufferDrainer bufferDrainer = new BufferDrainer(null, 0);
+        expectedException.expect(IllegalStateException.class);
+        bufferDrainer.getInputBuffer();
+    }
+
+    @Test
+    public void testTooEarlyErrorStreamRead() {
+        final BufferDrainer bufferDrainer = new BufferDrainer(null, 0);
+        expectedException.expect(IllegalStateException.class);
+        bufferDrainer.getErrorBuffer();
+    }
+
+    @Test
+    public void testMultipleDrainBufferCalls() throws IOException {
+        Process process = 
BlockingWritesExitValueProcess.createFastWritingProcess("", "");
+        final BufferDrainer bufferDrainer = new BufferDrainer(process, 0);
+        bufferDrainer.drainBuffers();
+        expectedException.expect(IllegalStateException.class);
+        bufferDrainer.drainBuffers();
+    }
+
+    @Test
+    public void testReadSinglePass() throws Exception {
+        checkSampleStringWithDifferentMaxLength( new 
StringAndIntProcessingCallback() {
+            public void call(String sampleString, int suggestedMaxLength) 
throws IOException {
+                readSinglePassAndAssert(sampleString, suggestedMaxLength);
+            }
+        });
+    }
+
+    private void readSinglePassAndAssert(String sampleString, int 
suggestedMaxLength) throws IOException {
+        BufferedReader sampleBufferedReader = new BufferedReader(new 
StringReader(sampleString));
+        StringBuffer storageBuffer = new StringBuffer();
+        boolean readAll = false;
+        int charRead = BufferDrainer.drainBuffer(sampleBufferedReader, 
storageBuffer, suggestedMaxLength, 0, readAll);
+        assertTrue("Some characters should have been read but none was", 
sampleString.length() == 0 || charRead > 0);
+        assertTrue("Read character count mismatch", charRead <= 
sampleString.length());
+        assertTrue("Content read mismatch", 
sampleString.startsWith(storageBuffer.toString()));
+    }
+
+    @Test
+    public void testReadTillAvailable() throws Exception {
+        checkSampleStringWithDifferentMaxLength( new 
StringAndIntProcessingCallback() {
+            public void call(String sampleString, int suggestedMaxLength) 
throws IOException {
+                readTillAvailableAndAssert(sampleString, suggestedMaxLength);
+            }
+        });
+    }
+
+    private void readTillAvailableAndAssert(String sampleString, int 
suggestedMaxLength) throws IOException {
+        BufferedReader sampleBufferedReader = new BufferedReader(new 
StringReader(sampleString));
+        StringBuffer storageBuffer = new StringBuffer();
+        boolean readAll = true;
+        int charRead = BufferDrainer.drainBuffer(sampleBufferedReader, 
storageBuffer, suggestedMaxLength, 0, readAll);
+        assertTrue("Read character count mismatch", charRead <= 
sampleString.length());
+        assertTrue("Content read mismatch", 
sampleString.startsWith(storageBuffer.toString()));
+    }
+
+    @Test
+    public void testDrainBuffersImmediatelyEndingProcess() throws Exception {
+        checkSampleStringWithDifferentMaxLength( new 
StringAndIntProcessingCallback() {
+            public void call(String sampleString, int suggestedMaxLength) 
throws IOException {
+                checkDrainBuffers(1, sampleString, "", suggestedMaxLength);
+                checkDrainBuffers(1, "", sampleString.toLowerCase(), 
suggestedMaxLength);
+                checkDrainBuffers(1, sampleString, sampleString.toLowerCase(), 
suggestedMaxLength);
+            }
+        });
+    }
+
+    @Test
+    public void testDrainBuffersShortProcess() throws Exception {
+        checkSampleStringWithDifferentMaxLength( new 
StringAndIntProcessingCallback() {
+            public void call(String sampleString, int suggestedMaxLength) 
throws IOException {
+                checkDrainBuffers(2, sampleString, "", suggestedMaxLength);
+                checkDrainBuffers(2, "", sampleString.toLowerCase(), 
suggestedMaxLength);
+                checkDrainBuffers(2, sampleString, sampleString.toLowerCase(), 
suggestedMaxLength);
+            }
+        });
+    }
+
+    private void checkDrainBuffers(int runningSteps, String outputString, 
String errorString, int maxLength) throws IOException {
+        Process process = mock(Process.class);
+        when(process.exitValue()).thenAnswer(new Answer() {
+            private int invocationCounter = 0;
+
+            public Object answer(InvocationOnMock invocation) {
+                if (++invocationCounter == runningSteps) {
+                    return BlockingWritesExitValueProcess.EXIT_VALUE;
+                }
+                throw new IllegalThreadStateException("Process is still 
running");
+            }
+        });
+        byte []outputByteArray = outputString.getBytes(StandardCharsets.UTF_8);
+        byte []errorByteArray = errorString.getBytes(StandardCharsets.UTF_8);
+        doReturn(new 
ByteArrayInputStream(outputByteArray)).when(process).getInputStream();
+        doReturn(new 
ByteArrayInputStream(errorByteArray)).when(process).getErrorStream();
+        checkDrainBuffers(process, outputString, errorString, maxLength);
+    }
+
+    @Test
+    public void testDrainBuffersFast() throws Exception {
+        checkSampleStringWithDifferentMaxLength( new 
StringAndIntProcessingCallback() {
+            public void call(String sampleString, int suggestedMaxLength) 
throws IOException, InterruptedException {
+                checkDrainBufferFast(sampleString, "", suggestedMaxLength);
+                checkDrainBufferFast("", sampleString.toLowerCase(), 
suggestedMaxLength);
+                checkDrainBufferFast(sampleString, sampleString.toLowerCase(), 
suggestedMaxLength);
+            }
+        });
+    }
+
+    private void checkDrainBufferFast(String outputString, String errorString, 
int suggestedMaxLength)
+            throws IOException, InterruptedException {
+        Process process = 
BlockingWritesExitValueProcess.createFastWritingProcess(outputString, 
errorString);
+        int timeout = calculateTimeoutForTest(false, null);
+        drainProcessAndCheckTimeout(process, timeout, outputString, 
errorString, suggestedMaxLength);
+    }
+
+    @Test
+    public void testDrainBuffersSlowWrite() throws Exception {
+        checkSampleStringWithDifferentMaxLength( new 
StringAndIntProcessingCallback() {
+            public void call(String sampleString, int suggestedMaxLength) 
throws IOException, InterruptedException {
+                checkDrainBufferSlow( sampleString, "", suggestedMaxLength);
+                checkDrainBufferSlow("", sampleString.toLowerCase(), 
suggestedMaxLength);
+                checkDrainBufferSlow(sampleString, sampleString.toLowerCase(), 
suggestedMaxLength);
+            }
+        });
+    }
+
+    private void checkDrainBufferSlow(String outputString, String errorString, 
int suggestedMaxLength)
+            throws IOException, InterruptedException {
+        int catBufferSize = Math.max(outputString.length()/2, 
errorString.length()/2);
+        Process process = 
BlockingWritesExitValueProcess.createBufferLimitedProcess(outputString, 
errorString, catBufferSize);
+        int []defaultPauseTimes = {500};
+        int timeout = calculateTimeoutForTest(true, defaultPauseTimes);
+        drainProcessAndCheckTimeout(process, timeout, outputString, 
errorString, suggestedMaxLength);
+    }
+
+    private void drainProcessAndCheckTimeout(Process process, int timeout, 
String outputString, String errorString, int maxLength)
+            throws IOException, InterruptedException {
+        long timeBefore = System.currentTimeMillis();
+        assertionError = null;
+        Thread t = createBufferDrainerThread(process, outputString, 
errorString, maxLength);
+        t.start();
+        t.join(timeout);
+        long timeToRun = System.currentTimeMillis() - timeBefore;
+        assertTrue("drainBuffer test timed out after "+ timeToRun+" ms",  
timeToRun < timeout);
+        if (assertionError != null) {
+            throw assertionError;
+        }
+        if (ioException != null) {
+            throw ioException;
+        }
+    }
+
+    private int calculateTimeoutForTest(boolean simulateSlowWriting, int[] 
pauseTimes) {
+        int basicTimeout = 1000;
+        if (simulateSlowWriting) {
+            int timeout = basicTimeout;
+            for (int i=0; i<4; ++i) {
+                int nextPause = pauseTimes == null ? 500 : pauseTimes[i % 
pauseTimes.length];
+                timeout += 2 * nextPause;
+            }
+            return timeout;
+        }
+        else {
+            return basicTimeout;
+        }
+    }
+
+    @Test
+    public void testDrainBuffersLongPause() throws Exception {
+        checkSampleStringWithDifferentMaxLength( new 
StringAndIntProcessingCallback() {
+            public void call(String sampleString, int suggestedMaxLength) 
throws IOException, InterruptedException {
+                // only run some of the tests because of the speed limit
+                boolean runThisTest = sampleString.length() == 0 || 
sampleString.length() < 1024*1024 && suggestedMaxLength > 1024;
+                if (runThisTest) {
+                    checkDrainBuffersLongPause( sampleString, "", 
suggestedMaxLength);
+                    checkDrainBuffersLongPause("", sampleString.toLowerCase(), 
suggestedMaxLength);
+                    checkDrainBuffersLongPause(sampleString, 
sampleString.toLowerCase(), suggestedMaxLength);
+                }
+            }
+        });
+    }
+
+    private void checkDrainBuffersLongPause(String outputString, String 
errorString, int suggestedMaxLength)
+            throws IOException, InterruptedException {
+        int[] longPauseMs = {10*1000};
+        Process process = 
BlockingWritesExitValueProcess.createPausedProcess(outputString, errorString, 
longPauseMs);
+        int timeout = calculateTimeoutForTest(true, longPauseMs);
+        drainProcessAndCheckTimeout(process, timeout, outputString, 
errorString, suggestedMaxLength);
+    }
+
+    @Test
+    public void testDrainBuffersRandomPause() throws Exception {
+        checkSampleStringWithDifferentMaxLength( new 
StringAndIntProcessingCallback() {
+            public void call(String sampleString, int suggestedMaxLength) 
throws IOException, InterruptedException {
+                checkDrainBuffersRandomPause( sampleString, "", 
suggestedMaxLength);
+                checkDrainBuffersRandomPause("", sampleString.toLowerCase(), 
suggestedMaxLength);
+                checkDrainBuffersRandomPause(sampleString, 
sampleString.toLowerCase(), suggestedMaxLength);
+            }
+        });
+    }
+
+    private void checkDrainBuffersRandomPause(String outputString, String 
errorString, int suggestedMaxLength)
+            throws IOException, InterruptedException {
+        int[] pauseTimes = generateRandomPauseIntervals();
+        Process process = 
BlockingWritesExitValueProcess.createPausedProcess(outputString, errorString, 
pauseTimes);
+        int timeout = calculateTimeoutForTest(true, pauseTimes);
+        drainProcessAndCheckTimeout(process, timeout, outputString, 
errorString, suggestedMaxLength);
+    }
+
+    private int[] generateRandomPauseIntervals() {
+        int[] longPauseMs = new int[4];
+        for (int i=0; i<longPauseMs.length; ++i) {
+            longPauseMs[i] = new Random().nextInt((2000)) + 100;
+        }
+        return longPauseMs;
+    }
+
+    private void checkDrainBuffers(Process process, String outputString, 
String errorString, int maxLength) throws IOException {
+        BufferDrainer bufferDrainer = new BufferDrainer(process, maxLength);
+        int exitValue = bufferDrainer.drainBuffers();
+        assertEquals("Invalid exit Value", 
BlockingWritesExitValueProcess.EXIT_VALUE, exitValue);
+        StringBuffer inputBuffer = bufferDrainer.getInputBuffer();
+        StringBuffer errorBuffer = bufferDrainer.getErrorBuffer();
+        assertTrue("Invalid input buffer length", 
inputBuffer.toString().length() >= Math.min(outputString.length(), maxLength));
+        assertTrue("Invalid input buffer", 
outputString.startsWith(inputBuffer.toString()));
+        assertTrue("Invalid error buffer", 
errorString.startsWith(errorBuffer.toString()));
+    }
+
+    @Test
+    public void testParallelDrainBuffers() throws Exception {
+        String sampleString = generateString(1024);
+        checkParallelDrainBuffers(20, sampleString, 
sampleString.toLowerCase(), false);
+    }
+
+    @Test
+    public void testParallelDrainBuffersWithFailure() throws Exception {
+        String sampleString = generateString(1024);
+        checkParallelDrainBuffers(20, sampleString, 
sampleString.toLowerCase(), true);
+    }
+
+    private void checkParallelDrainBuffers(int threadNum, String outputString, 
String errorString, boolean alsoAddFailingThread)
+            throws IOException, InterruptedException {
+        Thread []threads = new Thread[threadNum];
+        assertionError = null;
+        int maxTimeout = 0;
+        if (alsoAddFailingThread) {
+            BlockingWritesExitValueProcess failingProcess = 
BlockingWritesExitValueProcess.createFastWritingProcess("","");
+            createBufferDrainerThread(failingProcess, "", "", 1024).start();
+            failingProcess.simulateFailure();
+        }
+        for (int i=0; i<threads.length; ++i) {
+            String randomizedOutputString = randomSubstring(outputString);
+            String randomizedErrorString = randomSubstring(errorString);
+            int[] pauseTimes = generateRandomPauseIntervals();
+            Process process = 
BlockingWritesExitValueProcess.createPausedProcess(randomizedOutputString, 
randomizedErrorString,
+                    pauseTimes);
+            int timeout = calculateTimeoutForTest(true, pauseTimes);
+            if (timeout > maxTimeout) {
+                maxTimeout = timeout;
+            }
+            threads[i] = createBufferDrainerThread(process, 
randomizedOutputString, randomizedErrorString, 1024);
+            threads[i].start();
+        }
+        waitThreadsToFinishInTime(threads, maxTimeout);
+        if (assertionError != null) {
+            throw assertionError;
+        }
+        if (ioException != null) {
+            throw ioException;
+        }
+    }
+
+    private Thread createBufferDrainerThread(Process process, String 
randomizedOutputString, String randomizedErrorString,
+                                             int maxLength) {
+        return new Thread() {
+            public void run() {
+                try {
+                    checkDrainBuffers(process, randomizedOutputString, 
randomizedErrorString, maxLength);
+                } catch (IOException e) {
+                    ioException = e;
+                } catch (AssertionError e) {
+                    assertionError = e;
+                }
+            }
+        };
+    }
+
+    private void waitThreadsToFinishInTime(Thread[] threads, int maxTimeout) 
throws InterruptedException {
+        long timeBefore = System.currentTimeMillis();
+        for (Thread t : threads) {
+            long timeToRunSoFar = System.currentTimeMillis() - timeBefore;
+            long remainingTime = maxTimeout - timeToRunSoFar;
+            if (remainingTime > 0) {
+                t.join(remainingTime);
+            }
+            else {
+                fail("drainBuffer test timed out after "+ timeToRunSoFar+" 
ms");
+            }
+        }
+        long timeToRunTotal = System.currentTimeMillis() - timeBefore;
+        assertTrue("drainBuffer test timed out after "+ timeToRunTotal+" ms",  
timeToRunTotal < maxTimeout);
+    }
+
+    private String randomSubstring(String string) {
+        int minLength = (int)(0.8 * string.length());
+        int maxLength = string.length();
+        int length = new Random().nextInt((maxLength - minLength) + 1) + 
minLength;
+        return string.substring(0, length);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 6e30b3e..b0dd230 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.2.0 release (trunk - unreleased)
 
+OOZIE-3382 [SSH action] [SSH action] Optimize process streams draining 
(asalamon74 via andras.piros)
 OOZIE-3384 [tests] 
TestWorkflowActionRetryInfoXCommand#testRetryConsoleUrlForked() is flaky 
(asalamon74 via kmarton)
 OOZIE-3386 Misleading error message when workflow application does not exist 
(kmarton)
 OOZIE-3120 Upgrade maven-assembly plugin to v.3.1.0 (dionusos via kmarton)

Reply via email to