This is an automated email from the ASF dual-hosted git repository.

sjaranowski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/maven-shared-utils.git


The following commit(s) were added to refs/heads/master by this push:
     new b60e8a2  [MSHARED-1072] fix blocking in StreamFeeder (#113)
b60e8a2 is described below

commit b60e8a2fbeadb5b9d150173a3e1c1e2a000f2aab
Author: Slawomir Jaranowski <[email protected]>
AuthorDate: Fri Apr 21 21:14:30 2023 +0200

    [MSHARED-1072] fix blocking in StreamFeeder (#113)
    
    
    If input stream has no more available data
    StreamFeeder was block forever
---
 .../utils/cli/CommandLineTimeOutException.java     | 12 ++-
 .../maven/shared/utils/cli/CommandLineUtils.java   | 73 +++--------------
 .../maven/shared/utils/cli/StreamFeeder.java       | 89 +++++++++------------
 .../maven/shared/utils/cli/StreamFeederTest.java   | 91 ++++++++++++++++++++++
 4 files changed, 148 insertions(+), 117 deletions(-)

diff --git 
a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
 
b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
index 8aead3d..88edb65 100644
--- 
a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
+++ 
b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
@@ -19,14 +19,13 @@
 package org.apache.maven.shared.utils.cli;
 
 /**
+ * Report a timeout for executing process.
+ *
  * @author Olivier Lamy
  *
  */
 public class CommandLineTimeOutException extends CommandLineException {
 
-    /**
-     *
-     */
     private static final long serialVersionUID = 7322428741683224481L;
 
     /**
@@ -36,4 +35,11 @@ public class CommandLineTimeOutException extends 
CommandLineException {
     public CommandLineTimeOutException(String message, Throwable cause) {
         super(message, cause);
     }
+
+    /**
+     * @param message The message of the exception.
+     */
+    public CommandLineTimeOutException(String message) {
+        super(message);
+    }
 }
diff --git 
a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java 
b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
index 5422af8..850d5b1 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
@@ -29,6 +29,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.maven.shared.utils.Os;
 import org.apache.maven.shared.utils.StringUtils;
@@ -45,7 +46,7 @@ public abstract class CommandLineUtils {
      */
     public static class StringStreamConsumer implements StreamConsumer {
 
-        private final StringBuffer string = new StringBuffer();
+        private final StringBuilder string = new StringBuilder();
 
         private static final String LS = System.getProperty("line.separator", 
"\n");
 
@@ -65,16 +66,6 @@ public abstract class CommandLineUtils {
         }
     }
 
-    /**
-     * Number of milliseconds per second.
-     */
-    private static final long MILLIS_PER_SECOND = 1000L;
-
-    /**
-     * Number of nanoseconds per second.
-     */
-    private static final long NANOS_PER_SECOND = 1000000000L;
-
     /**
      * @param cl The command line {@link Commandline}
      * @param systemOut {@link StreamConsumer}
@@ -277,26 +268,13 @@ public abstract class CommandLineUtils {
                     errorPumper.setName("StreamPumper-systemErr");
                     errorPumper.start();
 
-                    int returnValue;
-                    if (timeoutInSeconds <= 0) {
-                        returnValue = p.waitFor();
-                    } else {
-                        final long now = System.nanoTime();
-                        final long timeout = now + NANOS_PER_SECOND * 
timeoutInSeconds;
-                        while (isAlive(p) && (System.nanoTime() < timeout)) {
-                            // The timeout is specified in seconds. Therefore 
we must not sleep longer than one second
-                            // but we should sleep as long as possible to 
reduce the number of iterations performed.
-                            Thread.sleep(MILLIS_PER_SECOND - 1L);
-                        }
-
-                        if (isAlive(p)) {
-                            throw new InterruptedException(
-                                    String.format("Process timed out after %d 
seconds.", timeoutInSeconds));
-                        }
-
-                        returnValue = p.exitValue();
+                    if (timeoutInSeconds > 0 && !p.waitFor(timeoutInSeconds, 
TimeUnit.SECONDS)) {
+                        throw new CommandLineTimeOutException(
+                                String.format("Process timed out after %d 
seconds.", timeoutInSeconds));
                     }
 
+                    int returnValue = p.waitFor();
+
                     // TODO Find out if waitUntilDone needs to be called using 
a try-finally construct. The method may
                     // throw an
                     //      InterruptedException so that calls to 
waitUntilDone may be skipped.
@@ -325,12 +303,8 @@ public abstract class CommandLineUtils {
                     outputPumper.waitUntilDone();
                     errorPumper.waitUntilDone();
 
-                    if (inputFeeder != null) {
-                        inputFeeder.close();
-
-                        if (inputFeeder.getException() != null) {
-                            throw new CommandLineException("Failure processing 
stdin.", inputFeeder.getException());
-                        }
+                    if (inputFeeder != null && inputFeeder.getException() != 
null) {
+                        throw new CommandLineException("Failure processing 
stdin.", inputFeeder.getException());
                     }
 
                     if (outputPumper.getException() != null) {
@@ -343,13 +317,10 @@ public abstract class CommandLineUtils {
 
                     return returnValue;
                 } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
                     throw new CommandLineTimeOutException(
                             "Error while executing external command, process 
killed.", ex);
-
                 } finally {
-                    if (inputFeeder != null) {
-                        inputFeeder.disable();
-                    }
                     if (outputPumper != null) {
                         outputPumper.disable();
                     }
@@ -363,14 +334,7 @@ public abstract class CommandLineUtils {
                         }
                     } finally {
                         ShutdownHookUtils.removeShutdownHook(processHook);
-
-                        try {
-                            processHook.run();
-                        } finally {
-                            if (inputFeeder != null) {
-                                inputFeeder.close();
-                            }
-                        }
+                        processHook.run();
                     }
                 }
             }
@@ -405,19 +369,6 @@ public abstract class CommandLineUtils {
         return ensureCaseSensitivity(envs, caseSensitive);
     }
 
-    private static boolean isAlive(Process p) {
-        if (p == null) {
-            return false;
-        }
-
-        try {
-            p.exitValue();
-            return false;
-        } catch (IllegalThreadStateException e) {
-            return true;
-        }
-    }
-
     /**
      * @param toProcess The command line to translate.
      * @return The array of translated parts.
@@ -436,7 +387,7 @@ public abstract class CommandLineUtils {
         boolean inEscape = false;
         int state = normal;
         final StringTokenizer tok = new StringTokenizer(toProcess, "\"\' \\", 
true);
-        List<String> tokens = new ArrayList<String>();
+        List<String> tokens = new ArrayList<>();
         StringBuilder current = new StringBuilder();
 
         while (tok.hasMoreTokens()) {
diff --git a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java 
b/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
index e5743a8..f26db63 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
@@ -21,72 +21,62 @@ package org.apache.maven.shared.utils.cli;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Objects;
 
 /**
  * Read from an InputStream and write the output to an OutputStream.
  *
  * @author <a href="mailto:[email protected]";>Trygve Laugst&oslash;l</a>
  */
-class StreamFeeder extends AbstractStreamHandler {
+class StreamFeeder extends Thread {
 
-    private final AtomicReference<InputStream> input;
+    private final InputStream input;
 
-    private final AtomicReference<OutputStream> output;
+    private final OutputStream output;
 
-    private volatile Throwable exception;
+    private Throwable exception;
+    private boolean done;
+
+    private final Object lock = new Object();
 
     /**
      * Create a new StreamFeeder
      *
-     * @param input Stream to read from
+     * @param input  Stream to read from
      * @param output Stream to write to
      */
     StreamFeeder(InputStream input, OutputStream output) {
-        super();
-        this.input = new AtomicReference<InputStream>(input);
-        this.output = new AtomicReference<OutputStream>(output);
+        this.input = Objects.requireNonNull(input);
+        this.output = Objects.requireNonNull(output);
+        this.done = false;
     }
 
     @Override
+    @SuppressWarnings("checkstyle:innerassignment")
     public void run() {
         try {
-            feed();
-        } catch (Throwable e) {
-            // Catch everything so the streams will be closed and flagged as 
done.
-            if (this.exception != null) {
-                this.exception = e;
+            for (int data; !isInterrupted() && (data = input.read()) != -1; ) {
+                output.write(data);
             }
+            output.flush();
+        } catch (IOException e) {
+            exception = e;
         } finally {
             close();
-
-            synchronized (this) {
-                notifyAll();
-            }
         }
-    }
 
-    public void close() {
-        setDone();
-        final InputStream is = input.getAndSet(null);
-        if (is != null) {
-            try {
-                is.close();
-            } catch (IOException ex) {
-                if (this.exception != null) {
-                    this.exception = ex;
-                }
-            }
+        synchronized (lock) {
+            done = true;
+            lock.notifyAll();
         }
+    }
 
-        final OutputStream os = output.getAndSet(null);
-        if (os != null) {
-            try {
-                os.close();
-            } catch (IOException ex) {
-                if (this.exception != null) {
-                    this.exception = ex;
-                }
+    private void close() {
+        try {
+            output.close();
+        } catch (IOException e) {
+            if (exception == null) {
+                exception = e;
             }
         }
     }
@@ -98,23 +88,16 @@ class StreamFeeder extends AbstractStreamHandler {
         return this.exception;
     }
 
-    @SuppressWarnings("checkstyle:innerassignment")
-    private void feed() throws IOException {
-        InputStream is = input.get();
-        OutputStream os = output.get();
-        boolean flush = false;
-
-        if (is != null && os != null) {
-            for (int data; !isDone() && (data = is.read()) != -1; ) {
-                if (!isDisabled()) {
-                    os.write(data);
-                    flush = true;
+    public void waitUntilDone() {
+        this.interrupt();
+        synchronized (lock) {
+            while (!done) {
+                try {
+                    lock.wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
                 }
             }
-
-            if (flush) {
-                os.flush();
-            }
         }
     }
 }
diff --git 
a/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java 
b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
new file mode 100644
index 0000000..8decce0
--- /dev/null
+++ b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.maven.shared.utils.cli;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamFeederTest {
+    static class BlockingInputStream extends ByteArrayInputStream {
+        public BlockingInputStream(byte[] buf) {
+            super(buf);
+        }
+
+        @Override
+        public synchronized int read() {
+            int data = super.read();
+            if (data >= 0) {
+                return data;
+            }
+
+            // end test data ... block
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+            return -1;
+        }
+    }
+
+    @Test
+    public void waitUntilFeederDone() throws InterruptedException {
+
+        String TEST_DATA = "TestData";
+
+        BlockingInputStream inputStream = new 
BlockingInputStream(TEST_DATA.getBytes());
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+        StreamFeeder streamFeeder = new StreamFeeder(inputStream, 
outputStream);
+
+        streamFeeder.start();
+
+        // wait until all data from steam will be read
+        while (outputStream.size() < TEST_DATA.length()) {
+            Thread.sleep(10);
+        }
+
+        streamFeeder.waitUntilDone(); // wait until process finish
+
+        assertEquals(TEST_DATA, outputStream.toString());
+    }
+}

Reply via email to