This is an automated email from the ASF dual-hosted git repository.
sjaranowski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/maven-shared-utils.git
The following commit(s) were added to refs/heads/master by this push:
new 1c264ba [MSHARED-1072] Poll data from input stream
1c264ba is described below
commit 1c264bae8d9c4488dc11beaa3c51e73227ab0cb0
Author: Slawomir Jaranowski <[email protected]>
AuthorDate: Sat May 6 11:40:18 2023 +0200
[MSHARED-1072] Poll data from input stream
Input stream can be a System.in - all read will be blocked
We need read data in no blocking mode
---
.../maven/shared/utils/cli/CommandLineUtils.java | 6 +-
.../{StreamFeeder.java => StreamPollFeeder.java} | 60 ++++++++------
.../maven/shared/utils/cli/StreamFeederTest.java | 91 ----------------------
.../shared/utils/cli/StreamPollFeederTest.java | 76 ++++++++++++++++++
4 files changed, 116 insertions(+), 117 deletions(-)
diff --git
a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
index 850d5b1..96714b4 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
@@ -250,13 +250,13 @@ public abstract class CommandLineUtils {
@Override
public Integer call() throws CommandLineException {
- StreamFeeder inputFeeder = null;
+ StreamPollFeeder inputFeeder = null;
StreamPumper outputPumper = null;
StreamPumper errorPumper = null;
try {
if (systemIn != null) {
- inputFeeder = new StreamFeeder(systemIn,
p.getOutputStream());
- inputFeeder.setName("StreamFeeder-systemIn");
+ inputFeeder = new StreamPollFeeder(systemIn,
p.getOutputStream());
+ inputFeeder.setName("StreamPollFeeder-systemIn");
inputFeeder.start();
}
diff --git a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
b/src/main/java/org/apache/maven/shared/utils/cli/StreamPollFeeder.java
similarity index 68%
rename from src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
rename to src/main/java/org/apache/maven/shared/utils/cli/StreamPollFeeder.java
index f26db63..27d75bb 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/StreamPollFeeder.java
@@ -24,51 +24,64 @@ import java.io.OutputStream;
import java.util.Objects;
/**
- * Read from an InputStream and write the output to an OutputStream.
+ * Poll InputStream for available data and write the output to an OutputStream.
*
* @author <a href="mailto:[email protected]">Trygve Laugstøl</a>
*/
-class StreamFeeder extends Thread {
+class StreamPollFeeder extends Thread {
- private final InputStream input;
+ public static final int BUF_LEN = 80;
+ private final InputStream input;
private final OutputStream output;
private Throwable exception;
- private boolean done;
+ private boolean done;
private final Object lock = new Object();
/**
- * Create a new StreamFeeder
+ * Create a new StreamPollFeeder
*
* @param input Stream to read from
* @param output Stream to write to
*/
- StreamFeeder(InputStream input, OutputStream output) {
+ StreamPollFeeder(InputStream input, OutputStream output) {
this.input = Objects.requireNonNull(input);
this.output = Objects.requireNonNull(output);
this.done = false;
}
@Override
- @SuppressWarnings("checkstyle:innerassignment")
public void run() {
+
+ byte[] buf = new byte[BUF_LEN];
+
try {
- for (int data; !isInterrupted() && (data = input.read()) != -1; ) {
- output.write(data);
+ while (!done) {
+ if (input.available() > 0) {
+ int i = input.read(buf);
+ if (i > 0) {
+ output.write(buf, 0, i);
+ output.flush();
+ } else {
+ done = true;
+ }
+ } else {
+ synchronized (lock) {
+ if (!done) {
+ lock.wait(100);
+ }
+ }
+ }
}
- output.flush();
} catch (IOException e) {
exception = e;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
} finally {
close();
}
-
- synchronized (lock) {
- done = true;
- lock.notifyAll();
- }
}
private void close() {
@@ -89,15 +102,16 @@ class StreamFeeder extends Thread {
}
public void waitUntilDone() {
- this.interrupt();
+
synchronized (lock) {
- while (!done) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
+ done = true;
+ lock.notifyAll();
+ }
+
+ try {
+ join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
}
diff --git
a/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
deleted file mode 100644
index 8decce0..0000000
--- a/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.maven.shared.utils.cli;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class StreamFeederTest {
- static class BlockingInputStream extends ByteArrayInputStream {
- public BlockingInputStream(byte[] buf) {
- super(buf);
- }
-
- @Override
- public synchronized int read() {
- int data = super.read();
- if (data >= 0) {
- return data;
- }
-
- // end test data ... block
- try {
- wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- return -1;
- }
- }
-
- @Test
- public void waitUntilFeederDone() throws InterruptedException {
-
- String TEST_DATA = "TestData";
-
- BlockingInputStream inputStream = new
BlockingInputStream(TEST_DATA.getBytes());
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-
- StreamFeeder streamFeeder = new StreamFeeder(inputStream,
outputStream);
-
- streamFeeder.start();
-
- // wait until all data from steam will be read
- while (outputStream.size() < TEST_DATA.length()) {
- Thread.sleep(10);
- }
-
- streamFeeder.waitUntilDone(); // wait until process finish
-
- assertEquals(TEST_DATA, outputStream.toString());
- }
-}
diff --git
a/src/test/java/org/apache/maven/shared/utils/cli/StreamPollFeederTest.java
b/src/test/java/org/apache/maven/shared/utils/cli/StreamPollFeederTest.java
new file mode 100644
index 0000000..7883de7
--- /dev/null
+++ b/src/test/java/org/apache/maven/shared/utils/cli/StreamPollFeederTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.maven.shared.utils.cli;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class StreamPollFeederTest {
+
+ @Test
+ public void waitUntilFeederDoneOnInputStream() throws Exception {
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ StreamPollFeeder streamPollFeeder = new StreamPollFeeder(System.in,
outputStream);
+
+ // start thread
+ streamPollFeeder.start();
+
+ // wait a moment
+ Thread.sleep(100);
+
+ // wait until process finish
+ streamPollFeeder.waitUntilDone();
+ assertNull(streamPollFeeder.getException());
+ }
+
+ @Test
+ public void dataShouldBeCopied() throws InterruptedException, IOException {
+
+ StringBuilder TEST_DATA = new StringBuilder();
+ for (int i = 0; i < 100; i++) {
+ TEST_DATA.append("TestData");
+ }
+
+ ByteArrayInputStream inputStream =
+ new ByteArrayInputStream(TEST_DATA.toString().getBytes());
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ StreamPollFeeder streamPollFeeder = new StreamPollFeeder(inputStream,
outputStream);
+
+ streamPollFeeder.start();
+
+ // wait until all data from steam will be read
+ while (outputStream.size() < TEST_DATA.length()) {
+ Thread.sleep(100);
+ }
+
+ // wait until process finish
+ streamPollFeeder.waitUntilDone();
+ assertNull(streamPollFeeder.getException());
+
+ assertEquals(TEST_DATA.toString(), outputStream.toString());
+ }
+}