Repository: oozie Updated Branches: refs/heads/master 7ef418f7f -> 8013a945f
OOZIE-3382 [SSH action] [SSH action] Optimize process streams draining (asalamon74 via andras.piros) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8013a945 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8013a945 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8013a945 Branch: refs/heads/master Commit: 8013a945f16b5a10ad1fdd898ea10fcdb838117e Parents: 7ef418f Author: Andras Piros <andras.pi...@cloudera.com> Authored: Sat Dec 1 13:30:12 2018 +0100 Committer: Andras Piros <andras.pi...@cloudera.com> Committed: Sat Dec 1 13:30:12 2018 +0100 ---------------------------------------------------------------------- .../oozie/action/ssh/SshActionExecutor.java | 125 +------ .../org/apache/oozie/util/BufferDrainer.java | 164 +++++++++ .../apache/oozie/util/BlockingInputStream.java | 134 +++++++ .../util/BlockingWritesExitValueProcess.java | 132 +++++++ .../org/apache/oozie/util/DrainerTestCase.java | 76 ++++ .../oozie/util/TestBlockingInputStream.java | 153 ++++++++ .../TestBlockingWritesExitValueProcess.java | 65 ++++ .../apache/oozie/util/TestBufferDrainer.java | 363 +++++++++++++++++++ release-log.txt | 1 + 9 files changed, 1106 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java index 3e0e3c5..1e37e80 100644 --- a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java @@ -41,6 +41,7 @@ import org.apache.oozie.service.CallbackService; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.servlet.CallbackServlet; import org.apache.oozie.service.Services; +import org.apache.oozie.util.BufferDrainer; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.PropertiesUtils; import org.apache.oozie.util.XLog; @@ -146,11 +147,11 @@ public class SshActionExecutor extends ActionExecutor { LOG.debug("Ssh command [{0}]", dataCommand); try { final Process process = Runtime.getRuntime().exec(dataCommand.split("\\s")); - - final StringBuffer outBuffer = new StringBuffer(); - final StringBuffer errBuffer = new StringBuffer(); + final BufferDrainer bufferDrainer = new BufferDrainer(process, maxLen); + bufferDrainer.drainBuffers(); + final StringBuffer outBuffer = bufferDrainer.getInputBuffer(); + final StringBuffer errBuffer = bufferDrainer.getErrorBuffer(); boolean overflow = false; - drainBuffers(process, outBuffer, errBuffer, maxLen); LOG.trace("outBuffer={0}", outBuffer); LOG.trace("errBuffer={0}", errBuffer); if (outBuffer.length() > maxLen) { @@ -306,11 +307,11 @@ public class SshActionExecutor extends ActionExecutor { String outFile = getRemoteFileName(context, action, "pid", false, false); String getOutputCmd = SSH_COMMAND_BASE + host + " cat " + outFile; try { - Process process = Runtime.getRuntime().exec(getOutputCmd.split("\\s")); - StringBuffer buffer = new StringBuffer(); - drainBuffers(process, buffer, null, maxLen); + final Process process = Runtime.getRuntime().exec(getOutputCmd.split("\\s")); + final BufferDrainer bufferDrainer = new BufferDrainer(process, maxLen); + bufferDrainer.drainBuffers(); + final StringBuffer buffer = bufferDrainer.getInputBuffer(); String pid = getFirstLine(buffer); - if (Long.valueOf(pid) > 0) { return pid; } @@ -358,8 +359,9 @@ public class SshActionExecutor extends ActionExecutor { Runtime runtime = Runtime.getRuntime(); Process p = runtime.exec(command.split("\\s")); - StringBuffer errorBuffer = new StringBuffer(); - int exitValue = drainBuffers(p, null, errorBuffer, maxLen); + final BufferDrainer bufferDrainer = new BufferDrainer(p, maxLen); + final int exitValue = bufferDrainer.drainBuffers(); + final StringBuffer errorBuffer = bufferDrainer.getErrorBuffer(); if (exitValue != 0) { String error = getTruncatedString(errorBuffer); @@ -447,12 +449,11 @@ public class SshActionExecutor extends ActionExecutor { LOG.trace("Executing SSH command [finalCommand={0}]", Arrays.toString(finalCommand)); final Process p = runtime.exec(finalCommand); - final StringBuffer inputBuffer = new StringBuffer(); - final StringBuffer errorBuffer = new StringBuffer(); - final int exitValue = drainBuffers(p, inputBuffer, errorBuffer, maxLen); - + BufferDrainer bufferDrainer = new BufferDrainer(p, maxLen); + final int exitValue = bufferDrainer.drainBuffers(); + final StringBuffer inputBuffer = bufferDrainer.getInputBuffer(); + final StringBuffer errorBuffer = bufferDrainer.getErrorBuffer(); final String pid = getFirstLine(inputBuffer); - if (exitValue != 0) { String error = getTruncatedString(errorBuffer); throw new IOException(XLog.format("Not able to execute ssh-base.sh on {0}", host) + " | " + "ErrorStream: " @@ -504,7 +505,8 @@ public class SshActionExecutor extends ActionExecutor { Process ps = null; try { ps = Runtime.getRuntime().exec(command.split("\\s")); - returnValue = drainBuffers(ps, null, null, 0); + final BufferDrainer bufferDrainer = new BufferDrainer(ps, 0); + returnValue = bufferDrainer.drainBuffers(); } catch (IOException e) { throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog.format( @@ -729,97 +731,6 @@ public class SshActionExecutor extends ActionExecutor { } /** - * Drains the inputStream and errorStream of the Process being executed. The contents of the streams are stored if a - * buffer is provided for the stream. - * - * @param p The Process instance. - * @param inputBuffer The buffer into which STDOUT is to be read. Can be null if only draining is required. - * @param errorBuffer The buffer into which STDERR is to be read. Can be null if only draining is required. - * @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the - * store content may exceed this length. - * @return the exit value of the processSettings. - * @throws IOException - */ - private int drainBuffers(final Process p, final StringBuffer inputBuffer, final StringBuffer errorBuffer, final int maxLength) - throws IOException { - LOG.trace("drainBuffers() start"); - - int exitValue = -1; - - int inBytesRead = 0; - int errBytesRead = 0; - - boolean processEnded = false; - - try (final BufferedReader ir = new BufferedReader(new InputStreamReader(p.getInputStream(), Charsets.UTF_8)); - final BufferedReader er = new BufferedReader(new InputStreamReader(p.getErrorStream(), Charsets.UTF_8))) { - // Here we do some kind of busy waiting, checking whether the process has finished by calling Process#exitValue(). - // If not yet finished, an IllegalThreadStateException is thrown and ignored, the progress on stdout and stderr read, - // and retried until the process has ended. - // Note that Process#waitFor() may block sometimes, that's why we do a polling mechanism using Process#exitValue() - // instead. Until we extend unit and integration test coverage for SSH action, and we can introduce a more sophisticated - // error handling based on the extended coverage, this solution should stay in place. - while (!processEnded) { - try { - // Doesn't block but throws IllegalThreadStateException if the process hasn't finished yet - exitValue = p.exitValue(); - processEnded = true; - } - catch (final IllegalThreadStateException itse) { - // Continue to drain - } - - // Drain input and error streams - inBytesRead += drainBuffer(ir, inputBuffer, maxLength, inBytesRead, processEnded); - errBytesRead += drainBuffer(er, errorBuffer, maxLength, errBytesRead, processEnded); - - // Necessary evil: sleep and retry - if (!processEnded) { - try { - Thread.sleep(500); - } - catch (final InterruptedException ie) { - // Sleep a little, then check again - } - } - } - } - - LOG.trace("drainBuffers() end [exitValue={0}]", exitValue); - - return exitValue; - } - - /** - * Reads the contents of a stream and stores them into the provided buffer. - * - * @param br The stream to be read. - * @param storageBuf The buffer into which the contents of the stream are to be stored. - * @param maxLength The maximum number of bytes to be stored in the buffer. An indicative value and may be - * exceeded. - * @param bytesRead The number of bytes read from this stream to date. - * @param readAll If true, the stream is drained while their is data available in it. Otherwise, only a single chunk - * of data is read, irrespective of how much is available. - * @return bReadSession returns drainBuffer for stream of contents - * @throws IOException - */ - private int drainBuffer(BufferedReader br, StringBuffer storageBuf, int maxLength, int bytesRead, boolean readAll) - throws IOException { - int bReadSession = 0; - if (br.ready()) { - char[] buf = new char[1024]; - do { - int bReadCurrent = br.read(buf, 0, 1024); - if (storageBuf != null && bytesRead < maxLength) { - storageBuf.append(buf, 0, bReadCurrent); - } - bReadSession += bReadCurrent; - } while (br.ready() && readAll); - } - return bReadSession; - } - - /** * Returns the first line from a StringBuffer, recognized by the new line character \n. * * @param buffer The StringBuffer from which the first line is required. http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/main/java/org/apache/oozie/util/BufferDrainer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/BufferDrainer.java b/core/src/main/java/org/apache/oozie/util/BufferDrainer.java new file mode 100644 index 0000000..304fd6d --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/BufferDrainer.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +public class BufferDrainer { + + private final XLog LOG = XLog.getLog(getClass()); + private static final int DRAIN_BUFFER_SLEEP_TIME_MS = 500; + private final Process process; + private final int maxLength; + private boolean drainBuffersFinished; + private final StringBuffer inputBuffer; + private final StringBuffer errorBuffer; + + /** + * @param process The Process instance. + * @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the + * store content may exceed this length. + */ + public BufferDrainer(Process process, int maxLength) { + this.process = process; + this.maxLength = maxLength; + drainBuffersFinished = false; + inputBuffer = new StringBuffer(); + errorBuffer = new StringBuffer(); + } + + /** + * Drains the inputStream and errorStream of the Process being executed. The contents of the streams are stored if a + * buffer is provided for the stream. + * + * @return the exit value of the processSettings. + * @throws IOException + */ + public int drainBuffers() throws IOException { + if (drainBuffersFinished) { + throw new IllegalStateException("Buffer draining has already been finished"); + } + LOG.trace("drainBuffers() start"); + + int exitValue = -1; + + int inBytesRead = 0; + int errBytesRead = 0; + + boolean processEnded = false; + + try (final BufferedReader ir = new BufferedReader(new InputStreamReader(process.getInputStream(), Charsets.UTF_8)); + final BufferedReader er = new BufferedReader(new InputStreamReader(process.getErrorStream(), Charsets.UTF_8))) { + // Here we do some kind of busy waiting, checking whether the process has finished by calling Process#exitValue(). + // If not yet finished, an IllegalThreadStateException is thrown and ignored, the progress on stdout and stderr read, + // and retried until the process has ended. + // Note that Process#waitFor() may block sometimes, that's why we do a polling mechanism using Process#exitValue() + // instead. Until we extend unit and integration test coverage for SSH action, and we can introduce a more + // sophisticated error handling based on the extended coverage, this solution should stay in place. + while (!processEnded) { + try { + // Doesn't block but throws IllegalThreadStateException if the process hasn't finished yet + exitValue = process.exitValue(); + processEnded = true; + } + catch (final IllegalThreadStateException itse) { + // Continue to drain + } + + // Drain input and error streams + inBytesRead += drainBuffer(ir, inputBuffer, maxLength, inBytesRead, processEnded); + errBytesRead += drainBuffer(er, errorBuffer, maxLength, errBytesRead, processEnded); + + // Necessary evil: sleep and retry + if (!processEnded) { + try { + LOG.trace("Sleeping {}ms during buffer draining", DRAIN_BUFFER_SLEEP_TIME_MS); + Thread.sleep(DRAIN_BUFFER_SLEEP_TIME_MS); + } + catch (final InterruptedException ie) { + // Sleep a little, then check again + } + } + } + } + + LOG.trace("drainBuffers() end [exitValue={0}]", exitValue); + drainBuffersFinished = true; + return exitValue; + } + + public StringBuffer getInputBuffer() { + if (drainBuffersFinished) { + return inputBuffer; + } + else { + throw new IllegalStateException("Buffer draining has not been finished yet"); + } + } + + public StringBuffer getErrorBuffer() { + if (drainBuffersFinished) { + return errorBuffer; + } + else { + throw new IllegalStateException("Buffer draining has not been finished yet"); + } + } + + /** + * Reads the contents of a stream and stores them into the provided buffer. + * + * @param br The stream to be read. + * @param storageBuf The buffer into which the contents of the stream are to be stored. + * @param maxLength The maximum number of bytes to be stored in the buffer. An indicative value and may be + * exceeded. + * @param bytesRead The number of bytes read from this stream to date. + * @param readAll If true, the stream is drained while their is data available in it. Otherwise, only a single chunk + * of data is read, irrespective of how much is available. + * @return bReadSession returns drainBuffer for stream of contents + * @throws IOException + */ + @VisibleForTesting + static int drainBuffer(BufferedReader br, StringBuffer storageBuf, int maxLength, int bytesRead, boolean readAll) + throws IOException { + int bReadSession = 0; + int chunkSize = 1024; + if (br.ready()) { + char[] buf = new char[1024]; + int bReadCurrent; + boolean wantsToReadFurther; + do { + bReadCurrent = br.read(buf, 0, chunkSize); + if (storageBuf != null && bytesRead < maxLength && bReadCurrent != -1) { + storageBuf.append(buf, 0, bReadCurrent); + } + if (bReadCurrent != -1) { + bReadSession += bReadCurrent; + } + wantsToReadFurther = bReadCurrent != -1 && (readAll || bReadCurrent == chunkSize); + } while (br.ready() && wantsToReadFurther); + } + return bReadSession; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/test/java/org/apache/oozie/util/BlockingInputStream.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/BlockingInputStream.java b/core/src/test/java/org/apache/oozie/util/BlockingInputStream.java new file mode 100644 index 0000000..3472bc6 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/util/BlockingInputStream.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util; + +import com.google.common.annotations.VisibleForTesting; + +import java.io.ByteArrayInputStream; +import java.util.Random; + +/** + * A {@code ByteArrayInputStream} where the full data is not available at the beginning but produced on the fly. Simulates + * an external process generating the data. + * + * <p> + * {@code simulateSlowWriting} simulates when the process writes data in chunks instead of writing all the data + * at the beginning. + * <p> + * It is possible to specify the pause times before writing the next chunk. + * <p> + * It is possible to specify {@code catBufferSize}, if the buffer contains more unread data, the process reports + * itself as running. + * + */ +public class BlockingInputStream extends ByteArrayInputStream { + private static final int CHUNK_WRITE_DELAY_MS = 500; + private static final double CHUNK_WRITE_MIN_RATIO = 0.25; + private int readPosition; + private int writePosition; + private boolean simulateSlowWriting; + private int bufferSize; + private long lastTimeDataWritten; + private int pauseIndex; + private int[] pauseTimes = {CHUNK_WRITE_DELAY_MS}; + private boolean failed; + + BlockingInputStream(byte buf[], boolean simulateSlowWriting) { + super(buf); + this.bufferSize = buf.length; + this.simulateSlowWriting = simulateSlowWriting; + writeFirstChunk(buf, simulateSlowWriting); + } + + void setBufferSize(int bufferSize) { + if (!simulateSlowWriting) { + throw new IllegalArgumentException("Cannot specify bufferSize"); + } + this.bufferSize = bufferSize; + } + + void setPauseTimes(int[] pauseTimes) { + this.pauseTimes = pauseTimes; + } + + private void writeFirstChunk(byte[] buf, boolean simulateSlowWriting) { + if (simulateSlowWriting) { + writeNextChunk(0); + } + else { + writeNextChunk(buf.length); + } + } + + @Override + public synchronized int read(byte b[], int off, int len) { + final int bytesReadyToRead = writePosition - readPosition; + final int bytesToRead = Math.min(len, bytesReadyToRead); + final int readBytes = super.read(b, off, bytesToRead); + final boolean someBytesRead = readBytes != -1; + if (someBytesRead) { + readPosition += readBytes; + } + return readBytes; + } + + @Override + public synchronized int available() { + return writePosition - readPosition; + } + + @VisibleForTesting + boolean checkBlockedAndTryWriteNextChunk() { + boolean fullBufferReady = writePosition == buf.length; + boolean readFullBuffer = readPosition == buf.length; + boolean thereAreNoBytesInBuffer = writePosition - readPosition == 0; + boolean isBlocked = failed || (!readFullBuffer && (!fullBufferReady || thereAreNoBytesInBuffer )); + tryWriteNextChunk(); + return isBlocked; + } + + private boolean readyToWriteNextChunk() { + return System.currentTimeMillis() - lastTimeDataWritten > getPauseTimeMs(pauseIndex); + } + + private int getPauseTimeMs(int index) { + return pauseTimes[index % pauseTimes.length]; + } + + private void tryWriteNextChunk() { + if (!failed && writePosition < buf.length && available() < bufferSize && readyToWriteNextChunk()) { + int maxSize = Math.min(bufferSize - available(), buf.length - writePosition); + int minSize = Math.min((int)(CHUNK_WRITE_MIN_RATIO * buf.length +1), maxSize); + int nextChunkSize = new Random().nextInt((maxSize - minSize) + 1) + minSize; + writeNextChunk(nextChunkSize); + } + } + + private void writeNextChunk(int chunkSize) { + writePosition += chunkSize; + lastTimeDataWritten = System.currentTimeMillis(); + if (chunkSize > 0) { + ++pauseIndex; + } + } + + void simulateFailure() { + failed = true; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/test/java/org/apache/oozie/util/BlockingWritesExitValueProcess.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/BlockingWritesExitValueProcess.java b/core/src/test/java/org/apache/oozie/util/BlockingWritesExitValueProcess.java new file mode 100644 index 0000000..116cab3 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/util/BlockingWritesExitValueProcess.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; + +/** + * Simulates different processes writing to standard output / error. + * <p> + * {code exitValue()} throws {@code IllegalThreadStateException} while the buffers are blocked. + * <p> + * Uses {@code BlockInputStream} + */ +public class BlockingWritesExitValueProcess extends Process { + static final int EXIT_VALUE = 1; + + private BlockingInputStream inputStream; + private BlockingInputStream errorStream; + private String outputString; + private String errorString; + private int bufferSize = Integer.MAX_VALUE; + private boolean simulateSlowWriting; + private int[] outputPauseTimes; + private int[] errorPauseTimes; + + private BlockingWritesExitValueProcess(String outputString, String errorString) { + this.outputString = outputString; + this.errorString = errorString; + } + + static BlockingWritesExitValueProcess createFastWritingProcess(String outputString, String errorString) { + BlockingWritesExitValueProcess process = new BlockingWritesExitValueProcess(outputString, errorString); + process.generateStreams(); + return process; + } + + static BlockingWritesExitValueProcess createBufferLimitedProcess(String outputString, String errorString, int bufferSize) { + BlockingWritesExitValueProcess process = new BlockingWritesExitValueProcess(outputString, errorString); + process.simulateSlowWriting = true; + process.bufferSize = bufferSize; + process.generateStreams(); + return process; + } + + static BlockingWritesExitValueProcess createPausedProcess(String outputString, String errorString, int[] pauseTimes) { + BlockingWritesExitValueProcess process = new BlockingWritesExitValueProcess(outputString, errorString); + process.simulateSlowWriting = true; + process.outputPauseTimes = pauseTimes; + process.errorPauseTimes = pauseTimes; + process.generateStreams(); + return process; + } + + private void generateStreams() { + byte []outputByteArray = outputString.getBytes(StandardCharsets.UTF_8); + byte []errorByteArray = errorString.getBytes(StandardCharsets.UTF_8); + inputStream = new BlockingInputStream(outputByteArray, simulateSlowWriting); + if (simulateSlowWriting) { + inputStream.setBufferSize(bufferSize); + } + if (outputPauseTimes != null) { + inputStream.setPauseTimes(outputPauseTimes); + } + errorStream = new BlockingInputStream(errorByteArray, simulateSlowWriting); + if (simulateSlowWriting) { + errorStream.setBufferSize(bufferSize); + } + if (errorPauseTimes != null) { + errorStream.setPauseTimes(errorPauseTimes); + } + } + + + @Override + public OutputStream getOutputStream() { + throw new UnsupportedOperationException(); + } + + @Override + public InputStream getInputStream() { + return inputStream; + } + + @Override + public InputStream getErrorStream() { + return errorStream; + } + + @Override + public int waitFor() { + throw new UnsupportedOperationException(); + } + + @Override + public int exitValue() { + boolean inputStreamBlocked = inputStream.checkBlockedAndTryWriteNextChunk(); + boolean errorStreamBlocked = errorStream.checkBlockedAndTryWriteNextChunk(); + if (inputStreamBlocked || errorStreamBlocked) { + throw new IllegalThreadStateException("Process is still running"); + } + return EXIT_VALUE; + } + + @Override + public void destroy() { + throw new UnsupportedOperationException(); + } + + void simulateFailure() { + inputStream.simulateFailure(); + errorStream.simulateFailure(); + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/test/java/org/apache/oozie/util/DrainerTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/DrainerTestCase.java b/core/src/test/java/org/apache/oozie/util/DrainerTestCase.java new file mode 100644 index 0000000..2221e61 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/util/DrainerTestCase.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util; + +import org.junit.BeforeClass; + +import java.util.ArrayList; +import java.util.List; + +public class DrainerTestCase { + + private static final String HEX_CHARACTERS = "0123456789ABCDEF"; + private static final List<String> sampleStrings = new ArrayList<>(); + private static final int[] suggestedMaxLengthsToCheck = {1, 1024, 1024*1024}; + private static final int[] stringLengthsToCheck = {0, 16, 4096, 1024*1024, 20*1024*1024}; + + @BeforeClass + public static void setUpBeforeClass() { + if (sampleStrings.isEmpty()) { + generateSamples(); + } + } + + private static void generateSamples() { + for (int length : stringLengthsToCheck) { + sampleStrings.add(generateString(length)); + } + } + + static String generateString(int length) { + StringBuffer sb = new StringBuffer(); + for (int i=0; i<length/HEX_CHARACTERS.length(); ++i) { + sb.append(HEX_CHARACTERS); + } + sb.append(HEX_CHARACTERS.substring(0,length % HEX_CHARACTERS.length())); + return sb.toString(); + } + + static abstract class StringAndIntProcessingCallback { + public abstract void call(String sampleString, int suggestedMaxLength) throws Exception; + } + + static abstract class StringProcessingCallback { + public abstract void call(String sampleString) throws Exception; + } + + void checkSampleStringWithDifferentMaxLength(StringAndIntProcessingCallback callback) throws Exception { + for (String sampleString : sampleStrings) { + for (int length : suggestedMaxLengthsToCheck) { + callback.call(sampleString, length); + } + } + } + + void checkSampleStrings(StringProcessingCallback callback) throws Exception { + for (String sampleString : sampleStrings) { + callback.call(sampleString); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/test/java/org/apache/oozie/util/TestBlockingInputStream.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/TestBlockingInputStream.java b/core/src/test/java/org/apache/oozie/util/TestBlockingInputStream.java new file mode 100644 index 0000000..f365f17 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/util/TestBlockingInputStream.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util; + +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestBlockingInputStream extends DrainerTestCase { + @Test + public void testFastWritingBlockingInputStream() throws Exception { + checkSampleStrings( new StringProcessingCallback() { + public void call(String sampleString) { + readFastBlockingInputStreamAndAssert(sampleString); + } + }); + } + + private void readFastBlockingInputStreamAndAssert(String sampleString) { + byte []sampleByteArray = sampleString.getBytes(StandardCharsets.UTF_8); + boolean simulateSlowWriting = false; + BlockingInputStream sampleStream = new BlockingInputStream(sampleByteArray, simulateSlowWriting); + readTillStreamEndAndAssert(sampleByteArray, sampleStream, true); + } + + @Test + public void testSlowWritingBlockingInputStream() throws Exception { + checkSampleStrings( new StringProcessingCallback() { + public void call(String sampleString) { + readSlowBlockingInputStreamAndAssert(sampleString); + } + }); + } + + private void readSlowBlockingInputStreamAndAssert(String sampleString) { + byte []sampleByteArray = sampleString.getBytes(StandardCharsets.UTF_8); + boolean simulateSlowWriting = true; + BlockingInputStream sampleStream = new BlockingInputStream(sampleByteArray, simulateSlowWriting); + assertEquals("No characters should be available to read", 0, sampleStream.available()); + assertTrue("Stream should be in blocked state",sampleString.isEmpty() || sampleStream.checkBlockedAndTryWriteNextChunk()); + waitAndWriteNextChunks(sampleStream, 600); + readTillStreamEndAndAssert(sampleByteArray, sampleStream, true); + } + + private void waitAndWriteNextChunks(BlockingInputStream sampleStream, int sleepTimeMs) { + // at most 4 steps should be enough to produce the full content or reach the buffer limit + for (int i = 0; i < 4; ++i) { + waitAndWriteNextChunk(sampleStream, sleepTimeMs); + } + } + + private void waitAndWriteNextChunk(BlockingInputStream sampleStream, int sleepTimeMs) { + try { + Thread.sleep(sleepTimeMs); + } catch (final InterruptedException ignoredException) { + } + sampleStream.checkBlockedAndTryWriteNextChunk(); + } + + @Test + public void testLimitedWritingBlockingInputStream() throws Exception { + checkSampleStrings( new StringProcessingCallback() { + public void call(String sampleString) { + readLimitedBlockingInputStreamAndAssert(sampleString); + } + }); + } + + private void readLimitedBlockingInputStreamAndAssert(String sampleString) { + byte []sampleByteArray = sampleString.getBytes(StandardCharsets.UTF_8); + boolean simulateSlowWriting = true; + BlockingInputStream sampleStream = new BlockingInputStream(sampleByteArray, simulateSlowWriting); + int bufferSize = sampleString.length() / 2; + int []smallPauseTimes = {10}; + sampleStream.setPauseTimes(smallPauseTimes); + sampleStream.setBufferSize(bufferSize); + assertEquals("No characters should be available to read", 0, sampleStream.available()); + assertTrue("Stream should be in blocked state",sampleString.isEmpty() || sampleStream.checkBlockedAndTryWriteNextChunk()); + waitAndWriteNextChunks(sampleStream, 20); + // buffersize should block the stream + assertTrue("Stream should be in blocked state",sampleString.isEmpty() || sampleStream.checkBlockedAndTryWriteNextChunk()); + assertEquals("Invalid number of characters available", bufferSize, sampleStream.available()); + readTillStreamEndAndAssert(sampleByteArray, sampleStream, false); + } + + private void readTillStreamEndAndAssert(byte[] sampleByteArray, BlockingInputStream sampleStream, + boolean assertingAlreadyNonBlocked) { + int availableCharacters = sampleStream.available(); + if (assertingAlreadyNonBlocked) { + assertFalse("Stream should not be in blocked state", sampleStream.checkBlockedAndTryWriteNextChunk()); + assertEquals("Invalid number of characters available", sampleByteArray.length, availableCharacters); + } + byte[] outputByteArray = new byte[sampleByteArray.length]; + int writeIndex=0; + while (availableCharacters > 0) { + int bytesRead = sampleStream.read(outputByteArray, writeIndex, availableCharacters); + assertEquals("Invalid number of characters read", availableCharacters, bytesRead); + writeIndex += bytesRead; + waitAndWriteNextChunk(sampleStream, 20); + availableCharacters = sampleStream.available(); + } + assertTrue("Content read mismatch", Arrays.equals(sampleByteArray, outputByteArray)); + } + + @Test + public void testFailure() throws Exception { + checkSampleStrings( new StringProcessingCallback() { + public void call(String sampleString) { + simulateFailureAndAssert(sampleString); + } + }); + } + + private void simulateFailureAndAssert(String sampleString) { + byte []sampleByteArray = sampleString.getBytes(StandardCharsets.UTF_8); + boolean simulateSlowWriting = false; + BlockingInputStream sampleStream = new BlockingInputStream(sampleByteArray, simulateSlowWriting); + int availableCharacters = sampleStream.available(); + assertFalse("Stream should not be in blocked state", sampleStream.checkBlockedAndTryWriteNextChunk()); + assertEquals("Invalid number of characters available", sampleByteArray.length, availableCharacters); + if (availableCharacters > 0) { + byte []outputByteArray = new byte[1]; + int bytesRead = sampleStream.read(outputByteArray, 0, 1); + assertEquals("Invalid number of characters read", 1, bytesRead); + } + sampleStream.simulateFailure(); + assertTrue("Stream should be in blocked state", sampleStream.checkBlockedAndTryWriteNextChunk()); + int availableCharactersAfterFailure = sampleStream.available(); + assertEquals("Invalid numbe of characters available after failure", + Math.max(availableCharacters-1, 0), availableCharactersAfterFailure); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/test/java/org/apache/oozie/util/TestBlockingWritesExitValueProcess.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/TestBlockingWritesExitValueProcess.java b/core/src/test/java/org/apache/oozie/util/TestBlockingWritesExitValueProcess.java new file mode 100644 index 0000000..fa0fa78 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/util/TestBlockingWritesExitValueProcess.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.util; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestBlockingWritesExitValueProcess extends DrainerTestCase { + private static final String TEST_STRING = "test string"; + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testNonBlockingProcess() { + final BlockingWritesExitValueProcess process = BlockingWritesExitValueProcess.createFastWritingProcess("", ""); + assertEquals("Invalid exit value", BlockingWritesExitValueProcess.EXIT_VALUE, process.exitValue()); + assertTrue("Invalid input stream class", process.getInputStream() instanceof BlockingInputStream); + assertTrue("Invalid error stream class", process.getErrorStream() instanceof BlockingInputStream); + } + + @Test + public void testBlockingInputStreamProcess() { + final BlockingWritesExitValueProcess process = BlockingWritesExitValueProcess.createBufferLimitedProcess(TEST_STRING, + "", 1); + expectedException.expect(IllegalThreadStateException.class); + process.exitValue(); + } + + @Test + public void testBlockingErrorStreamProcess() { + final BlockingWritesExitValueProcess process = BlockingWritesExitValueProcess.createBufferLimitedProcess("", + TEST_STRING, 1); + expectedException.expect(IllegalThreadStateException.class); + process.exitValue(); + } + + @Test + public void testFailure() { + final BlockingWritesExitValueProcess process = BlockingWritesExitValueProcess.createFastWritingProcess("", ""); + assertEquals("Invalid exit value", BlockingWritesExitValueProcess.EXIT_VALUE, process.exitValue()); + process.simulateFailure(); + expectedException.expect(IllegalThreadStateException.class); + process.exitValue(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/core/src/test/java/org/apache/oozie/util/TestBufferDrainer.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/TestBufferDrainer.java b/core/src/test/java/org/apache/oozie/util/TestBufferDrainer.java new file mode 100644 index 0000000..1c30943 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/util/TestBufferDrainer.java @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.StringReader; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestBufferDrainer extends DrainerTestCase { + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + private AssertionError assertionError; + private IOException ioException; + + @Test + public void testTooEarlyInputStreamRead() { + final BufferDrainer bufferDrainer = new BufferDrainer(null, 0); + expectedException.expect(IllegalStateException.class); + bufferDrainer.getInputBuffer(); + } + + @Test + public void testTooEarlyErrorStreamRead() { + final BufferDrainer bufferDrainer = new BufferDrainer(null, 0); + expectedException.expect(IllegalStateException.class); + bufferDrainer.getErrorBuffer(); + } + + @Test + public void testMultipleDrainBufferCalls() throws IOException { + Process process = BlockingWritesExitValueProcess.createFastWritingProcess("", ""); + final BufferDrainer bufferDrainer = new BufferDrainer(process, 0); + bufferDrainer.drainBuffers(); + expectedException.expect(IllegalStateException.class); + bufferDrainer.drainBuffers(); + } + + @Test + public void testReadSinglePass() throws Exception { + checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() { + public void call(String sampleString, int suggestedMaxLength) throws IOException { + readSinglePassAndAssert(sampleString, suggestedMaxLength); + } + }); + } + + private void readSinglePassAndAssert(String sampleString, int suggestedMaxLength) throws IOException { + BufferedReader sampleBufferedReader = new BufferedReader(new StringReader(sampleString)); + StringBuffer storageBuffer = new StringBuffer(); + boolean readAll = false; + int charRead = BufferDrainer.drainBuffer(sampleBufferedReader, storageBuffer, suggestedMaxLength, 0, readAll); + assertTrue("Some characters should have been read but none was", sampleString.length() == 0 || charRead > 0); + assertTrue("Read character count mismatch", charRead <= sampleString.length()); + assertTrue("Content read mismatch", sampleString.startsWith(storageBuffer.toString())); + } + + @Test + public void testReadTillAvailable() throws Exception { + checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() { + public void call(String sampleString, int suggestedMaxLength) throws IOException { + readTillAvailableAndAssert(sampleString, suggestedMaxLength); + } + }); + } + + private void readTillAvailableAndAssert(String sampleString, int suggestedMaxLength) throws IOException { + BufferedReader sampleBufferedReader = new BufferedReader(new StringReader(sampleString)); + StringBuffer storageBuffer = new StringBuffer(); + boolean readAll = true; + int charRead = BufferDrainer.drainBuffer(sampleBufferedReader, storageBuffer, suggestedMaxLength, 0, readAll); + assertTrue("Read character count mismatch", charRead <= sampleString.length()); + assertTrue("Content read mismatch", sampleString.startsWith(storageBuffer.toString())); + } + + @Test + public void testDrainBuffersImmediatelyEndingProcess() throws Exception { + checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() { + public void call(String sampleString, int suggestedMaxLength) throws IOException { + checkDrainBuffers(1, sampleString, "", suggestedMaxLength); + checkDrainBuffers(1, "", sampleString.toLowerCase(), suggestedMaxLength); + checkDrainBuffers(1, sampleString, sampleString.toLowerCase(), suggestedMaxLength); + } + }); + } + + @Test + public void testDrainBuffersShortProcess() throws Exception { + checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() { + public void call(String sampleString, int suggestedMaxLength) throws IOException { + checkDrainBuffers(2, sampleString, "", suggestedMaxLength); + checkDrainBuffers(2, "", sampleString.toLowerCase(), suggestedMaxLength); + checkDrainBuffers(2, sampleString, sampleString.toLowerCase(), suggestedMaxLength); + } + }); + } + + private void checkDrainBuffers(int runningSteps, String outputString, String errorString, int maxLength) throws IOException { + Process process = mock(Process.class); + when(process.exitValue()).thenAnswer(new Answer() { + private int invocationCounter = 0; + + public Object answer(InvocationOnMock invocation) { + if (++invocationCounter == runningSteps) { + return BlockingWritesExitValueProcess.EXIT_VALUE; + } + throw new IllegalThreadStateException("Process is still running"); + } + }); + byte []outputByteArray = outputString.getBytes(StandardCharsets.UTF_8); + byte []errorByteArray = errorString.getBytes(StandardCharsets.UTF_8); + doReturn(new ByteArrayInputStream(outputByteArray)).when(process).getInputStream(); + doReturn(new ByteArrayInputStream(errorByteArray)).when(process).getErrorStream(); + checkDrainBuffers(process, outputString, errorString, maxLength); + } + + @Test + public void testDrainBuffersFast() throws Exception { + checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() { + public void call(String sampleString, int suggestedMaxLength) throws IOException, InterruptedException { + checkDrainBufferFast(sampleString, "", suggestedMaxLength); + checkDrainBufferFast("", sampleString.toLowerCase(), suggestedMaxLength); + checkDrainBufferFast(sampleString, sampleString.toLowerCase(), suggestedMaxLength); + } + }); + } + + private void checkDrainBufferFast(String outputString, String errorString, int suggestedMaxLength) + throws IOException, InterruptedException { + Process process = BlockingWritesExitValueProcess.createFastWritingProcess(outputString, errorString); + int timeout = calculateTimeoutForTest(false, null); + drainProcessAndCheckTimeout(process, timeout, outputString, errorString, suggestedMaxLength); + } + + @Test + public void testDrainBuffersSlowWrite() throws Exception { + checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() { + public void call(String sampleString, int suggestedMaxLength) throws IOException, InterruptedException { + checkDrainBufferSlow( sampleString, "", suggestedMaxLength); + checkDrainBufferSlow("", sampleString.toLowerCase(), suggestedMaxLength); + checkDrainBufferSlow(sampleString, sampleString.toLowerCase(), suggestedMaxLength); + } + }); + } + + private void checkDrainBufferSlow(String outputString, String errorString, int suggestedMaxLength) + throws IOException, InterruptedException { + int catBufferSize = Math.max(outputString.length()/2, errorString.length()/2); + Process process = BlockingWritesExitValueProcess.createBufferLimitedProcess(outputString, errorString, catBufferSize); + int []defaultPauseTimes = {500}; + int timeout = calculateTimeoutForTest(true, defaultPauseTimes); + drainProcessAndCheckTimeout(process, timeout, outputString, errorString, suggestedMaxLength); + } + + private void drainProcessAndCheckTimeout(Process process, int timeout, String outputString, String errorString, int maxLength) + throws IOException, InterruptedException { + long timeBefore = System.currentTimeMillis(); + assertionError = null; + Thread t = createBufferDrainerThread(process, outputString, errorString, maxLength); + t.start(); + t.join(timeout); + long timeToRun = System.currentTimeMillis() - timeBefore; + assertTrue("drainBuffer test timed out after "+ timeToRun+" ms", timeToRun < timeout); + if (assertionError != null) { + throw assertionError; + } + if (ioException != null) { + throw ioException; + } + } + + private int calculateTimeoutForTest(boolean simulateSlowWriting, int[] pauseTimes) { + int basicTimeout = 1000; + if (simulateSlowWriting) { + int timeout = basicTimeout; + for (int i=0; i<4; ++i) { + int nextPause = pauseTimes == null ? 500 : pauseTimes[i % pauseTimes.length]; + timeout += 2 * nextPause; + } + return timeout; + } + else { + return basicTimeout; + } + } + + @Test + public void testDrainBuffersLongPause() throws Exception { + checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() { + public void call(String sampleString, int suggestedMaxLength) throws IOException, InterruptedException { + // only run some of the tests because of the speed limit + boolean runThisTest = sampleString.length() == 0 || sampleString.length() < 1024*1024 && suggestedMaxLength > 1024; + if (runThisTest) { + checkDrainBuffersLongPause( sampleString, "", suggestedMaxLength); + checkDrainBuffersLongPause("", sampleString.toLowerCase(), suggestedMaxLength); + checkDrainBuffersLongPause(sampleString, sampleString.toLowerCase(), suggestedMaxLength); + } + } + }); + } + + private void checkDrainBuffersLongPause(String outputString, String errorString, int suggestedMaxLength) + throws IOException, InterruptedException { + int[] longPauseMs = {10*1000}; + Process process = BlockingWritesExitValueProcess.createPausedProcess(outputString, errorString, longPauseMs); + int timeout = calculateTimeoutForTest(true, longPauseMs); + drainProcessAndCheckTimeout(process, timeout, outputString, errorString, suggestedMaxLength); + } + + @Test + public void testDrainBuffersRandomPause() throws Exception { + checkSampleStringWithDifferentMaxLength( new StringAndIntProcessingCallback() { + public void call(String sampleString, int suggestedMaxLength) throws IOException, InterruptedException { + checkDrainBuffersRandomPause( sampleString, "", suggestedMaxLength); + checkDrainBuffersRandomPause("", sampleString.toLowerCase(), suggestedMaxLength); + checkDrainBuffersRandomPause(sampleString, sampleString.toLowerCase(), suggestedMaxLength); + } + }); + } + + private void checkDrainBuffersRandomPause(String outputString, String errorString, int suggestedMaxLength) + throws IOException, InterruptedException { + int[] pauseTimes = generateRandomPauseIntervals(); + Process process = BlockingWritesExitValueProcess.createPausedProcess(outputString, errorString, pauseTimes); + int timeout = calculateTimeoutForTest(true, pauseTimes); + drainProcessAndCheckTimeout(process, timeout, outputString, errorString, suggestedMaxLength); + } + + private int[] generateRandomPauseIntervals() { + int[] longPauseMs = new int[4]; + for (int i=0; i<longPauseMs.length; ++i) { + longPauseMs[i] = new Random().nextInt((2000)) + 100; + } + return longPauseMs; + } + + private void checkDrainBuffers(Process process, String outputString, String errorString, int maxLength) throws IOException { + BufferDrainer bufferDrainer = new BufferDrainer(process, maxLength); + int exitValue = bufferDrainer.drainBuffers(); + assertEquals("Invalid exit Value", BlockingWritesExitValueProcess.EXIT_VALUE, exitValue); + StringBuffer inputBuffer = bufferDrainer.getInputBuffer(); + StringBuffer errorBuffer = bufferDrainer.getErrorBuffer(); + assertTrue("Invalid input buffer length", inputBuffer.toString().length() >= Math.min(outputString.length(), maxLength)); + assertTrue("Invalid input buffer", outputString.startsWith(inputBuffer.toString())); + assertTrue("Invalid error buffer", errorString.startsWith(errorBuffer.toString())); + } + + @Test + public void testParallelDrainBuffers() throws Exception { + String sampleString = generateString(1024); + checkParallelDrainBuffers(20, sampleString, sampleString.toLowerCase(), false); + } + + @Test + public void testParallelDrainBuffersWithFailure() throws Exception { + String sampleString = generateString(1024); + checkParallelDrainBuffers(20, sampleString, sampleString.toLowerCase(), true); + } + + private void checkParallelDrainBuffers(int threadNum, String outputString, String errorString, boolean alsoAddFailingThread) + throws IOException, InterruptedException { + Thread []threads = new Thread[threadNum]; + assertionError = null; + int maxTimeout = 0; + if (alsoAddFailingThread) { + BlockingWritesExitValueProcess failingProcess = BlockingWritesExitValueProcess.createFastWritingProcess("",""); + createBufferDrainerThread(failingProcess, "", "", 1024).start(); + failingProcess.simulateFailure(); + } + for (int i=0; i<threads.length; ++i) { + String randomizedOutputString = randomSubstring(outputString); + String randomizedErrorString = randomSubstring(errorString); + int[] pauseTimes = generateRandomPauseIntervals(); + Process process = BlockingWritesExitValueProcess.createPausedProcess(randomizedOutputString, randomizedErrorString, + pauseTimes); + int timeout = calculateTimeoutForTest(true, pauseTimes); + if (timeout > maxTimeout) { + maxTimeout = timeout; + } + threads[i] = createBufferDrainerThread(process, randomizedOutputString, randomizedErrorString, 1024); + threads[i].start(); + } + waitThreadsToFinishInTime(threads, maxTimeout); + if (assertionError != null) { + throw assertionError; + } + if (ioException != null) { + throw ioException; + } + } + + private Thread createBufferDrainerThread(Process process, String randomizedOutputString, String randomizedErrorString, + int maxLength) { + return new Thread() { + public void run() { + try { + checkDrainBuffers(process, randomizedOutputString, randomizedErrorString, maxLength); + } catch (IOException e) { + ioException = e; + } catch (AssertionError e) { + assertionError = e; + } + } + }; + } + + private void waitThreadsToFinishInTime(Thread[] threads, int maxTimeout) throws InterruptedException { + long timeBefore = System.currentTimeMillis(); + for (Thread t : threads) { + long timeToRunSoFar = System.currentTimeMillis() - timeBefore; + long remainingTime = maxTimeout - timeToRunSoFar; + if (remainingTime > 0) { + t.join(remainingTime); + } + else { + fail("drainBuffer test timed out after "+ timeToRunSoFar+" ms"); + } + } + long timeToRunTotal = System.currentTimeMillis() - timeBefore; + assertTrue("drainBuffer test timed out after "+ timeToRunTotal+" ms", timeToRunTotal < maxTimeout); + } + + private String randomSubstring(String string) { + int minLength = (int)(0.8 * string.length()); + int maxLength = string.length(); + int length = new Random().nextInt((maxLength - minLength) + 1) + minLength; + return string.substring(0, length); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/8013a945/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 6e30b3e..b0dd230 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.2.0 release (trunk - unreleased) +OOZIE-3382 [SSH action] [SSH action] Optimize process streams draining (asalamon74 via andras.piros) OOZIE-3384 [tests] TestWorkflowActionRetryInfoXCommand#testRetryConsoleUrlForked() is flaky (asalamon74 via kmarton) OOZIE-3386 Misleading error message when workflow application does not exist (kmarton) OOZIE-3120 Upgrade maven-assembly plugin to v.3.1.0 (dionusos via kmarton)