This is an automated email from the ASF dual-hosted git repository. ggregory pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-io.git
The following commit(s) were added to refs/heads/master by this push: new d9237fc Add QueueInput/OutputStream as simpler alternatives to PipedInput/OutputStream (#171) d9237fc is described below commit d9237fcadb398f514beddf35c8933a9fc5925a5d Author: maxxedev <5051664+maxxe...@users.noreply.github.com> AuthorDate: Thu Dec 10 11:23:52 2020 -0800 Add QueueInput/OutputStream as simpler alternatives to PipedInput/OutputStream (#171) * Add QueueInput/OutputStream as simpler alternatives to PipedInput/OutputStream. PipedInput/OutputStream in JDK can have surprisingly complex behavior with respect to how threads need to be arranged. QueueInput/OutputStream are much simpler alternatives that is easier to use correctly. * add more tests for edge cases * remove static imports on Objects::requireNonNull * improve documentation and exception handling --- .../apache/commons/io/input/QueueInputStream.java | 90 ++++++++++++++ .../commons/io/output/QueueOutputStream.java | 98 +++++++++++++++ .../commons/io/input/QueueInputStreamTest.java | 137 +++++++++++++++++++++ .../commons/io/output/QueueOutputStreamTest.java | 122 ++++++++++++++++++ 4 files changed, 447 insertions(+) diff --git a/src/main/java/org/apache/commons/io/input/QueueInputStream.java b/src/main/java/org/apache/commons/io/input/QueueInputStream.java new file mode 100644 index 0000000..4f3ff3a --- /dev/null +++ b/src/main/java/org/apache/commons/io/input/QueueInputStream.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.io.input; + +import org.apache.commons.io.output.QueueOutputStream; + +import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream. + * + * Example usage: + * <pre> + * QueueInputStream inputStream = new QueueInputStream(); + * QueueOutputStream outputStream = inputStream.newQueueOutputStream(); + * + * outputStream.write("hello world".getBytes(UTF_8)); + * inputStream.read(); + * </pre> + * + * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be + * used safely in a single thread or multiple threads. Also, unlike JDK classes, no special meaning is + * attached to initial or current thread. Instances can be used longer after initial threads exited. + * + * Closing a {@code QueueInputStream} has no effect. The methods in this class can be called after + * the stream has been closed without generating an {@code IOException}. + * + * @see QueueOutputStream + * @since 2.9.0 + */ +public class QueueInputStream extends InputStream { + + private final BlockingQueue<Integer> queue; + + /** + * Constructs a QueueInputStream with no limit to internal buffer size + */ + public QueueInputStream() { + this(new LinkedBlockingQueue<>()); + } + + /** + * Constructs a QueueInputStream with given buffer + * + * @param queue backing queue for the stream + */ + public QueueInputStream(final BlockingQueue<Integer> queue) { + this.queue = Objects.requireNonNull(queue, "queue is required"); + } + + /** + * Creates a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream. + * + * @return QueueOutputStream connected to this stream + */ + public QueueOutputStream newQueueOutputStream() { + return new QueueOutputStream(queue); + } + + /** + * Reads a single byte. + * + * @return either the byte read or {@code -1} if the end of the stream has been reached + */ + @Override + public int read() { + final Integer value = queue.poll(); + return value == null ? -1 : ((0xFF) & value); + } + +} diff --git a/src/main/java/org/apache/commons/io/output/QueueOutputStream.java b/src/main/java/org/apache/commons/io/output/QueueOutputStream.java new file mode 100644 index 0000000..28b3c7d --- /dev/null +++ b/src/main/java/org/apache/commons/io/output/QueueOutputStream.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.io.output; + +import org.apache.commons.io.input.QueueInputStream; + +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Simple alternative to JDK {@link java.io.PipedOutputStream}; queue input stream provides what's + * written in queue output stream. + * + * Example usage: + * <pre> + * QueueOutputStream outputStream = new QueueOutputStream(); + * QueueInputStream inputStream = outputStream.newPipeInputStream(); + * + * outputStream.write("hello world".getBytes(UTF_8)); + * inputStream.read(); + * </pre> + * + * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be + * used safely in a single thread or multiple threads. Also, unlike JDK classes, no special meaning is + * attached to initial or current thread. Instances can be used longer after initial threads exited. + * + * Closing a {@code QueueOutputStream} has no effect. The methods in this class can be called after + * the stream has been closed without generating an {@code IOException}. + * + * @see QueueInputStream + * @since 2.9.0 + */ +public class QueueOutputStream extends OutputStream { + + private final BlockingQueue<Integer> queue; + + /** + * Constructs a QueueOutputStream with no limit to internal buffer size + */ + public QueueOutputStream() { + this(new LinkedBlockingQueue<>()); + } + + /** + * Constructs a QueueOutputStream with given buffer + * + * @param queue backing queue for the stream + */ + public QueueOutputStream(final BlockingQueue<Integer> queue) { + this.queue = Objects.requireNonNull(queue, "queue is required"); + } + + /** + * Creates a new QueueInputStream instance connected to this. Writes to this output stream will be visible to the input stream. + * + * @return QueueInputStream connected to this stream + */ + public QueueInputStream newQueueInputStream() { + return new QueueInputStream(queue); + } + + /** + * Writes a single byte. + * + * @throws InterruptedIOException if the thread is interrupted while writing to the queue. + */ + @Override + public void write(final int b) throws InterruptedIOException { + try { + queue.put(0xFF & b); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + final InterruptedIOException interruptedIoException = new InterruptedIOException(); + interruptedIoException.initCause(e); + throw interruptedIoException; + } + } +} + diff --git a/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java new file mode 100644 index 0000000..774b985 --- /dev/null +++ b/src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.io.input; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.QueueOutputStream; +import org.apache.commons.io.output.QueueOutputStreamTest; +import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Stream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Test {@link QueueInputStream}. + * + * @see {@link QueueOutputStreamTest} + */ +public class QueueInputStreamTest { + + public static Stream<Arguments> inputData() { + return Stream.of(Arguments.of(""), + Arguments.of("1"), + Arguments.of("12"), + Arguments.of("1234"), + Arguments.of("12345678"), + Arguments.of(StringUtils.repeat("A", 4095)), + Arguments.of(StringUtils.repeat("A", 4096)), + Arguments.of(StringUtils.repeat("A", 4097)), + Arguments.of(StringUtils.repeat("A", 8191)), + Arguments.of(StringUtils.repeat("A", 8192)), + Arguments.of(StringUtils.repeat("A", 8193)), + Arguments.of(StringUtils.repeat("A", 8192 * 4))); + } + + @ParameterizedTest(name = "inputData={0}") + @MethodSource("inputData") + public void unbufferedReadWrite(final String inputData) throws IOException { + try (final QueueInputStream inputStream = new QueueInputStream(); + final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) { + writeUnbuffered(outputStream, inputData); + final String actualData = readUnbuffered(inputStream); + assertEquals(inputData, actualData); + } + } + + @ParameterizedTest(name = "inputData={0}") + @MethodSource("inputData") + public void bufferedReads(final String inputData) throws IOException { + final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); + try (final BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue)); + final QueueOutputStream outputStream = new QueueOutputStream(queue)) { + outputStream.write(inputData.getBytes(UTF_8)); + final String actualData = IOUtils.toString(inputStream, UTF_8); + assertEquals(inputData, actualData); + } + } + + @ParameterizedTest(name = "inputData={0}") + @MethodSource("inputData") + public void bufferedWrites(final String inputData) throws IOException { + final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); + try (final QueueInputStream inputStream = new QueueInputStream(queue); + final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) { + outputStream.write(inputData.getBytes(UTF_8)); + outputStream.flush(); + final String actualData = readUnbuffered(inputStream); + assertEquals(inputData, actualData); + } + } + + @ParameterizedTest(name = "inputData={0}") + @MethodSource("inputData") + public void bufferedReadWrite(final String inputData) throws IOException { + final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); + try (final BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue)); + final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) { + outputStream.write(inputData.getBytes(UTF_8)); + outputStream.flush(); + final String dataCopy = IOUtils.toString(inputStream, UTF_8); + assertEquals(inputData, dataCopy); + } + } + + @Test + public void testNullArgument() { + assertThrows(NullPointerException.class, () -> new QueueInputStream(null), "queue is required"); + } + + private int defaultBufferSize() { + return 8192; + } + + private void writeUnbuffered(final QueueOutputStream outputStream, final String inputData) throws InterruptedIOException { + final byte[] bytes = inputData.getBytes(UTF_8); + for (byte oneByte : bytes) { + outputStream.write(oneByte); + } + } + + private String readUnbuffered(final InputStream inputStream) throws IOException { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + int n = -1; + while ((n = inputStream.read()) != -1) { + byteArrayOutputStream.write(n); + } + return byteArrayOutputStream.toString("UTF-8"); + } +} diff --git a/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java b/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java new file mode 100644 index 0000000..28a726c --- /dev/null +++ b/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.io.output; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.QueueInputStream; +import org.apache.commons.io.input.QueueInputStreamTest; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; + +import java.io.InterruptedIOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; +import java.util.concurrent.Exchanger; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Test {@link QueueOutputStream} and {@link QueueInputStream} + * + * @see QueueInputStreamTest + */ +public class QueueOutputStreamTest { + + private static final ExecutorService executorService = Executors.newFixedThreadPool(5); + + @AfterAll + public static void afterAll() { + executorService.shutdown(); + } + + @Test + public void writeString() throws Exception { + try (final QueueOutputStream outputStream = new QueueOutputStream(); + final QueueInputStream inputStream = outputStream.newQueueInputStream()) { + outputStream.write("ABC".getBytes(UTF_8)); + final String value = IOUtils.toString(inputStream, UTF_8); + assertEquals("ABC", value); + } + } + + @Test + public void writeStringMultiThread() throws Exception { + try (final QueueOutputStream outputStream = callInThrowAwayThread(QueueOutputStream::new); + final QueueInputStream inputStream = callInThrowAwayThread(outputStream::newQueueInputStream)) { + callInThrowAwayThread(() -> { + outputStream.write("ABC".getBytes(UTF_8)); + return null; + }); + + final String value = callInThrowAwayThread(() -> IOUtils.toString(inputStream, UTF_8)); + assertEquals("ABC", value); + } + } + + @Test + public void writeInterrupted() throws Exception { + try (final QueueOutputStream outputStream = new QueueOutputStream(new LinkedBlockingQueue<>(1)); + final QueueInputStream inputStream = outputStream.newQueueInputStream()) { + + final int timeout = 1; + final Exchanger<Thread> writerThreadExchanger = new Exchanger<>(); + final Exchanger<Exception> exceptionExchanger = new Exchanger<>(); + executorService.submit(() -> { + final Thread writerThread = writerThreadExchanger.exchange(null, timeout, SECONDS); + writerThread.interrupt(); + return null; + }); + + executorService.submit(() -> { + try { + writerThreadExchanger.exchange(Thread.currentThread(), timeout, SECONDS); + outputStream.write("ABC".getBytes(StandardCharsets.UTF_8)); + } catch (Exception e) { + Thread.interrupted(); //clear interrupt + exceptionExchanger.exchange(e, timeout, SECONDS); + } + return null; + }); + + final Exception exception = exceptionExchanger.exchange(null, timeout, SECONDS); + assertNotNull(exception); + assertEquals(exception.getClass(), InterruptedIOException.class); + } + } + + @Test + public void testNullArgument() { + assertThrows(NullPointerException.class, () -> new QueueOutputStream(null), "queue is required"); + } + + private static <T> T callInThrowAwayThread(final Callable<T> callable) throws Exception { + final Exchanger<T> exchanger = new Exchanger<>(); + executorService.submit(() -> { + final T value = callable.call(); + exchanger.exchange(value); + return null; + }); + return exchanger.exchange(null); + } +}