This is an automated email from the ASF dual-hosted git repository.
sjaranowski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/maven-shared-utils.git
The following commit(s) were added to refs/heads/master by this push:
new b60e8a2 [MSHARED-1072] fix blocking in StreamFeeder (#113)
b60e8a2 is described below
commit b60e8a2fbeadb5b9d150173a3e1c1e2a000f2aab
Author: Slawomir Jaranowski <[email protected]>
AuthorDate: Fri Apr 21 21:14:30 2023 +0200
[MSHARED-1072] fix blocking in StreamFeeder (#113)
If input stream has no more available data
StreamFeeder was block forever
---
.../utils/cli/CommandLineTimeOutException.java | 12 ++-
.../maven/shared/utils/cli/CommandLineUtils.java | 73 +++--------------
.../maven/shared/utils/cli/StreamFeeder.java | 89 +++++++++------------
.../maven/shared/utils/cli/StreamFeederTest.java | 91 ++++++++++++++++++++++
4 files changed, 148 insertions(+), 117 deletions(-)
diff --git
a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
index 8aead3d..88edb65 100644
---
a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
+++
b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
@@ -19,14 +19,13 @@
package org.apache.maven.shared.utils.cli;
/**
+ * Report a timeout for executing process.
+ *
* @author Olivier Lamy
*
*/
public class CommandLineTimeOutException extends CommandLineException {
- /**
- *
- */
private static final long serialVersionUID = 7322428741683224481L;
/**
@@ -36,4 +35,11 @@ public class CommandLineTimeOutException extends
CommandLineException {
public CommandLineTimeOutException(String message, Throwable cause) {
super(message, cause);
}
+
+ /**
+ * @param message The message of the exception.
+ */
+ public CommandLineTimeOutException(String message) {
+ super(message);
+ }
}
diff --git
a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
index 5422af8..850d5b1 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
@@ -29,6 +29,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
import org.apache.maven.shared.utils.Os;
import org.apache.maven.shared.utils.StringUtils;
@@ -45,7 +46,7 @@ public abstract class CommandLineUtils {
*/
public static class StringStreamConsumer implements StreamConsumer {
- private final StringBuffer string = new StringBuffer();
+ private final StringBuilder string = new StringBuilder();
private static final String LS = System.getProperty("line.separator",
"\n");
@@ -65,16 +66,6 @@ public abstract class CommandLineUtils {
}
}
- /**
- * Number of milliseconds per second.
- */
- private static final long MILLIS_PER_SECOND = 1000L;
-
- /**
- * Number of nanoseconds per second.
- */
- private static final long NANOS_PER_SECOND = 1000000000L;
-
/**
* @param cl The command line {@link Commandline}
* @param systemOut {@link StreamConsumer}
@@ -277,26 +268,13 @@ public abstract class CommandLineUtils {
errorPumper.setName("StreamPumper-systemErr");
errorPumper.start();
- int returnValue;
- if (timeoutInSeconds <= 0) {
- returnValue = p.waitFor();
- } else {
- final long now = System.nanoTime();
- final long timeout = now + NANOS_PER_SECOND *
timeoutInSeconds;
- while (isAlive(p) && (System.nanoTime() < timeout)) {
- // The timeout is specified in seconds. Therefore
we must not sleep longer than one second
- // but we should sleep as long as possible to
reduce the number of iterations performed.
- Thread.sleep(MILLIS_PER_SECOND - 1L);
- }
-
- if (isAlive(p)) {
- throw new InterruptedException(
- String.format("Process timed out after %d
seconds.", timeoutInSeconds));
- }
-
- returnValue = p.exitValue();
+ if (timeoutInSeconds > 0 && !p.waitFor(timeoutInSeconds,
TimeUnit.SECONDS)) {
+ throw new CommandLineTimeOutException(
+ String.format("Process timed out after %d
seconds.", timeoutInSeconds));
}
+ int returnValue = p.waitFor();
+
// TODO Find out if waitUntilDone needs to be called using
a try-finally construct. The method may
// throw an
// InterruptedException so that calls to
waitUntilDone may be skipped.
@@ -325,12 +303,8 @@ public abstract class CommandLineUtils {
outputPumper.waitUntilDone();
errorPumper.waitUntilDone();
- if (inputFeeder != null) {
- inputFeeder.close();
-
- if (inputFeeder.getException() != null) {
- throw new CommandLineException("Failure processing
stdin.", inputFeeder.getException());
- }
+ if (inputFeeder != null && inputFeeder.getException() !=
null) {
+ throw new CommandLineException("Failure processing
stdin.", inputFeeder.getException());
}
if (outputPumper.getException() != null) {
@@ -343,13 +317,10 @@ public abstract class CommandLineUtils {
return returnValue;
} catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
throw new CommandLineTimeOutException(
"Error while executing external command, process
killed.", ex);
-
} finally {
- if (inputFeeder != null) {
- inputFeeder.disable();
- }
if (outputPumper != null) {
outputPumper.disable();
}
@@ -363,14 +334,7 @@ public abstract class CommandLineUtils {
}
} finally {
ShutdownHookUtils.removeShutdownHook(processHook);
-
- try {
- processHook.run();
- } finally {
- if (inputFeeder != null) {
- inputFeeder.close();
- }
- }
+ processHook.run();
}
}
}
@@ -405,19 +369,6 @@ public abstract class CommandLineUtils {
return ensureCaseSensitivity(envs, caseSensitive);
}
- private static boolean isAlive(Process p) {
- if (p == null) {
- return false;
- }
-
- try {
- p.exitValue();
- return false;
- } catch (IllegalThreadStateException e) {
- return true;
- }
- }
-
/**
* @param toProcess The command line to translate.
* @return The array of translated parts.
@@ -436,7 +387,7 @@ public abstract class CommandLineUtils {
boolean inEscape = false;
int state = normal;
final StringTokenizer tok = new StringTokenizer(toProcess, "\"\' \\",
true);
- List<String> tokens = new ArrayList<String>();
+ List<String> tokens = new ArrayList<>();
StringBuilder current = new StringBuilder();
while (tok.hasMoreTokens()) {
diff --git a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
b/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
index e5743a8..f26db63 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
@@ -21,72 +21,62 @@ package org.apache.maven.shared.utils.cli;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Objects;
/**
* Read from an InputStream and write the output to an OutputStream.
*
* @author <a href="mailto:[email protected]">Trygve Laugstøl</a>
*/
-class StreamFeeder extends AbstractStreamHandler {
+class StreamFeeder extends Thread {
- private final AtomicReference<InputStream> input;
+ private final InputStream input;
- private final AtomicReference<OutputStream> output;
+ private final OutputStream output;
- private volatile Throwable exception;
+ private Throwable exception;
+ private boolean done;
+
+ private final Object lock = new Object();
/**
* Create a new StreamFeeder
*
- * @param input Stream to read from
+ * @param input Stream to read from
* @param output Stream to write to
*/
StreamFeeder(InputStream input, OutputStream output) {
- super();
- this.input = new AtomicReference<InputStream>(input);
- this.output = new AtomicReference<OutputStream>(output);
+ this.input = Objects.requireNonNull(input);
+ this.output = Objects.requireNonNull(output);
+ this.done = false;
}
@Override
+ @SuppressWarnings("checkstyle:innerassignment")
public void run() {
try {
- feed();
- } catch (Throwable e) {
- // Catch everything so the streams will be closed and flagged as
done.
- if (this.exception != null) {
- this.exception = e;
+ for (int data; !isInterrupted() && (data = input.read()) != -1; ) {
+ output.write(data);
}
+ output.flush();
+ } catch (IOException e) {
+ exception = e;
} finally {
close();
-
- synchronized (this) {
- notifyAll();
- }
}
- }
- public void close() {
- setDone();
- final InputStream is = input.getAndSet(null);
- if (is != null) {
- try {
- is.close();
- } catch (IOException ex) {
- if (this.exception != null) {
- this.exception = ex;
- }
- }
+ synchronized (lock) {
+ done = true;
+ lock.notifyAll();
}
+ }
- final OutputStream os = output.getAndSet(null);
- if (os != null) {
- try {
- os.close();
- } catch (IOException ex) {
- if (this.exception != null) {
- this.exception = ex;
- }
+ private void close() {
+ try {
+ output.close();
+ } catch (IOException e) {
+ if (exception == null) {
+ exception = e;
}
}
}
@@ -98,23 +88,16 @@ class StreamFeeder extends AbstractStreamHandler {
return this.exception;
}
- @SuppressWarnings("checkstyle:innerassignment")
- private void feed() throws IOException {
- InputStream is = input.get();
- OutputStream os = output.get();
- boolean flush = false;
-
- if (is != null && os != null) {
- for (int data; !isDone() && (data = is.read()) != -1; ) {
- if (!isDisabled()) {
- os.write(data);
- flush = true;
+ public void waitUntilDone() {
+ this.interrupt();
+ synchronized (lock) {
+ while (!done) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
-
- if (flush) {
- os.flush();
- }
}
}
}
diff --git
a/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
new file mode 100644
index 0000000..8decce0
--- /dev/null
+++ b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.maven.shared.utils.cli;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamFeederTest {
+ static class BlockingInputStream extends ByteArrayInputStream {
+ public BlockingInputStream(byte[] buf) {
+ super(buf);
+ }
+
+ @Override
+ public synchronized int read() {
+ int data = super.read();
+ if (data >= 0) {
+ return data;
+ }
+
+ // end test data ... block
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return -1;
+ }
+ }
+
+ @Test
+ public void waitUntilFeederDone() throws InterruptedException {
+
+ String TEST_DATA = "TestData";
+
+ BlockingInputStream inputStream = new
BlockingInputStream(TEST_DATA.getBytes());
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ StreamFeeder streamFeeder = new StreamFeeder(inputStream,
outputStream);
+
+ streamFeeder.start();
+
+ // wait until all data from steam will be read
+ while (outputStream.size() < TEST_DATA.length()) {
+ Thread.sleep(10);
+ }
+
+ streamFeeder.waitUntilDone(); // wait until process finish
+
+ assertEquals(TEST_DATA, outputStream.toString());
+ }
+}