This is an automated email from the ASF dual-hosted git repository.

sjaranowski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/maven-shared-utils.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c264ba  [MSHARED-1072] Poll data from input stream
1c264ba is described below

commit 1c264bae8d9c4488dc11beaa3c51e73227ab0cb0
Author: Slawomir Jaranowski <[email protected]>
AuthorDate: Sat May 6 11:40:18 2023 +0200

    [MSHARED-1072] Poll data from input stream
    
    Input stream can be a System.in - all read will be blocked
    We need read data in no blocking mode
---
 .../maven/shared/utils/cli/CommandLineUtils.java   |  6 +-
 .../{StreamFeeder.java => StreamPollFeeder.java}   | 60 ++++++++------
 .../maven/shared/utils/cli/StreamFeederTest.java   | 91 ----------------------
 .../shared/utils/cli/StreamPollFeederTest.java     | 76 ++++++++++++++++++
 4 files changed, 116 insertions(+), 117 deletions(-)

diff --git 
a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java 
b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
index 850d5b1..96714b4 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
@@ -250,13 +250,13 @@ public abstract class CommandLineUtils {
 
             @Override
             public Integer call() throws CommandLineException {
-                StreamFeeder inputFeeder = null;
+                StreamPollFeeder inputFeeder = null;
                 StreamPumper outputPumper = null;
                 StreamPumper errorPumper = null;
                 try {
                     if (systemIn != null) {
-                        inputFeeder = new StreamFeeder(systemIn, 
p.getOutputStream());
-                        inputFeeder.setName("StreamFeeder-systemIn");
+                        inputFeeder = new StreamPollFeeder(systemIn, 
p.getOutputStream());
+                        inputFeeder.setName("StreamPollFeeder-systemIn");
                         inputFeeder.start();
                     }
 
diff --git a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java 
b/src/main/java/org/apache/maven/shared/utils/cli/StreamPollFeeder.java
similarity index 68%
rename from src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
rename to src/main/java/org/apache/maven/shared/utils/cli/StreamPollFeeder.java
index f26db63..27d75bb 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/StreamPollFeeder.java
@@ -24,51 +24,64 @@ import java.io.OutputStream;
 import java.util.Objects;
 
 /**
- * Read from an InputStream and write the output to an OutputStream.
+ * Poll InputStream for available data and write the output to an OutputStream.
  *
  * @author <a href="mailto:[email protected]";>Trygve Laugst&oslash;l</a>
  */
-class StreamFeeder extends Thread {
+class StreamPollFeeder extends Thread {
 
-    private final InputStream input;
+    public static final int BUF_LEN = 80;
 
+    private final InputStream input;
     private final OutputStream output;
 
     private Throwable exception;
-    private boolean done;
 
+    private boolean done;
     private final Object lock = new Object();
 
     /**
-     * Create a new StreamFeeder
+     * Create a new StreamPollFeeder
      *
      * @param input  Stream to read from
      * @param output Stream to write to
      */
-    StreamFeeder(InputStream input, OutputStream output) {
+    StreamPollFeeder(InputStream input, OutputStream output) {
         this.input = Objects.requireNonNull(input);
         this.output = Objects.requireNonNull(output);
         this.done = false;
     }
 
     @Override
-    @SuppressWarnings("checkstyle:innerassignment")
     public void run() {
+
+        byte[] buf = new byte[BUF_LEN];
+
         try {
-            for (int data; !isInterrupted() && (data = input.read()) != -1; ) {
-                output.write(data);
+            while (!done) {
+                if (input.available() > 0) {
+                    int i = input.read(buf);
+                    if (i > 0) {
+                        output.write(buf, 0, i);
+                        output.flush();
+                    } else {
+                        done = true;
+                    }
+                } else {
+                    synchronized (lock) {
+                        if (!done) {
+                            lock.wait(100);
+                        }
+                    }
+                }
             }
-            output.flush();
         } catch (IOException e) {
             exception = e;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
         } finally {
             close();
         }
-
-        synchronized (lock) {
-            done = true;
-            lock.notifyAll();
-        }
     }
 
     private void close() {
@@ -89,15 +102,16 @@ class StreamFeeder extends Thread {
     }
 
     public void waitUntilDone() {
-        this.interrupt();
+
         synchronized (lock) {
-            while (!done) {
-                try {
-                    lock.wait();
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
-            }
+            done = true;
+            lock.notifyAll();
+        }
+
+        try {
+            join();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
         }
     }
 }
diff --git 
a/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java 
b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
deleted file mode 100644
index 8decce0..0000000
--- a/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.maven.shared.utils.cli;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class StreamFeederTest {
-    static class BlockingInputStream extends ByteArrayInputStream {
-        public BlockingInputStream(byte[] buf) {
-            super(buf);
-        }
-
-        @Override
-        public synchronized int read() {
-            int data = super.read();
-            if (data >= 0) {
-                return data;
-            }
-
-            // end test data ... block
-            try {
-                wait();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
-            return -1;
-        }
-    }
-
-    @Test
-    public void waitUntilFeederDone() throws InterruptedException {
-
-        String TEST_DATA = "TestData";
-
-        BlockingInputStream inputStream = new 
BlockingInputStream(TEST_DATA.getBytes());
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-
-        StreamFeeder streamFeeder = new StreamFeeder(inputStream, 
outputStream);
-
-        streamFeeder.start();
-
-        // wait until all data from steam will be read
-        while (outputStream.size() < TEST_DATA.length()) {
-            Thread.sleep(10);
-        }
-
-        streamFeeder.waitUntilDone(); // wait until process finish
-
-        assertEquals(TEST_DATA, outputStream.toString());
-    }
-}
diff --git 
a/src/test/java/org/apache/maven/shared/utils/cli/StreamPollFeederTest.java 
b/src/test/java/org/apache/maven/shared/utils/cli/StreamPollFeederTest.java
new file mode 100644
index 0000000..7883de7
--- /dev/null
+++ b/src/test/java/org/apache/maven/shared/utils/cli/StreamPollFeederTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.maven.shared.utils.cli;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class StreamPollFeederTest {
+
+    @Test
+    public void waitUntilFeederDoneOnInputStream() throws Exception {
+
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        StreamPollFeeder streamPollFeeder = new StreamPollFeeder(System.in, 
outputStream);
+
+        // start thread
+        streamPollFeeder.start();
+
+        // wait a moment
+        Thread.sleep(100);
+
+        // wait until process finish
+        streamPollFeeder.waitUntilDone();
+        assertNull(streamPollFeeder.getException());
+    }
+
+    @Test
+    public void dataShouldBeCopied() throws InterruptedException, IOException {
+
+        StringBuilder TEST_DATA = new StringBuilder();
+        for (int i = 0; i < 100; i++) {
+            TEST_DATA.append("TestData");
+        }
+
+        ByteArrayInputStream inputStream =
+                new ByteArrayInputStream(TEST_DATA.toString().getBytes());
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+        StreamPollFeeder streamPollFeeder = new StreamPollFeeder(inputStream, 
outputStream);
+
+        streamPollFeeder.start();
+
+        //  wait until all data from steam will be read
+        while (outputStream.size() < TEST_DATA.length()) {
+            Thread.sleep(100);
+        }
+
+        // wait until process finish
+        streamPollFeeder.waitUntilDone();
+        assertNull(streamPollFeeder.getException());
+
+        assertEquals(TEST_DATA.toString(), outputStream.toString());
+    }
+}

Reply via email to