Repository: beam Updated Branches: refs/heads/master d4f9e9268 -> 38f189063
[BEAM-1347] Add utility to be able to model inbound reading as a single input stream Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ac7f9739 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ac7f9739 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ac7f9739 Branch: refs/heads/master Commit: ac7f9739b01626abc559748ae983f6eb988430af Parents: d4f9e92 Author: Luke Cwik <[email protected]> Authored: Tue Jul 25 09:02:41 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Thu Aug 3 12:19:17 2017 -0700 ---------------------------------------------------------------------- .../beam/fn/harness/stream/DataStreams.java | 158 +++++++++++++++++++ .../beam/fn/harness/stream/DataStreamsTest.java | 92 +++++++++++ 2 files changed, 250 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ac7f9739/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java new file mode 100644 index 0000000..d23d784 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.stream; + +import com.google.common.io.ByteStreams; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; +import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; + +/** + * {@link #inbound(Iterator)} treats multiple {@link ByteString}s as a single input stream and + * {@link #outbound(CloseableThrowingConsumer)} treats a single {@link OutputStream} as mulitple + * {@link ByteString}s. + */ +public class DataStreams { + /** + * Converts multiple {@link ByteString}s into a single {@link InputStream}. + * + * <p>The iterator is accessed lazily. The supplied {@link Iterator} should block until + * either it knows that no more values will be provided or it has the next {@link ByteString}. + */ + public static InputStream inbound(Iterator<ByteString> bytes) { + return new Inbound(bytes); + } + + /** + * Converts a single {@link OutputStream} into multiple {@link ByteString}s. + */ + public static OutputStream outbound(CloseableThrowingConsumer<ByteString> consumer) { + // TODO: Migrate logic from BeamFnDataBufferingOutboundObserver + throw new UnsupportedOperationException(); + } + + /** + * An input stream which concatenates multiple {@link ByteString}s. Lazily accesses the + * first {@link Iterator} on first access of this input stream. + * + * <p>Closing this input stream has no effect. + */ + private static class Inbound<T> extends InputStream { + private static final InputStream EMPTY_STREAM = new InputStream() { + @Override + public int read() throws IOException { + return -1; + } + }; + + private final Iterator<ByteString> bytes; + private InputStream currentStream; + + public Inbound(Iterator<ByteString> bytes) { + this.currentStream = EMPTY_STREAM; + this.bytes = bytes; + } + + @Override + public int read() throws IOException { + int rval = -1; + // Move on to the next stream if we have read nothing + while ((rval = currentStream.read()) == -1 && bytes.hasNext()) { + currentStream = bytes.next().newInput(); + } + return rval; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int remainingLen = len; + while ((remainingLen -= ByteStreams.read( + currentStream, b, off + len - remainingLen, remainingLen)) > 0) { + if (bytes.hasNext()) { + currentStream = bytes.next().newInput(); + } else { + int bytesRead = len - remainingLen; + return bytesRead > 0 ? bytesRead : -1; + } + } + return len - remainingLen; + } + } + + /** + * Allows for one or more writing threads to append values to this iterator while one reading + * thread reads values. {@link #hasNext()} and {@link #next()} will block until a value is + * available or this has been closed. + * + * <p>External synchronization must be provided if multiple readers would like to access the + * {@link Iterator#hasNext()} and {@link Iterator#next()} methods. + * + * <p>The order or values which are appended to this iterator is nondeterministic when multiple + * threads call {@link #accept(Object)}. + */ + public static class BlockingQueueIterator<T> implements + CloseableThrowingConsumer<T>, Iterator<T> { + private static final Object POISION_PILL = new Object(); + private final BlockingQueue<T> queue; + + /** Only accessed by {@link Iterator#hasNext()} and {@link Iterator#next()} methods. */ + private T currentElement; + + public BlockingQueueIterator(BlockingQueue<T> queue) { + this.queue = queue; + } + + @Override + public void close() throws Exception { + queue.put((T) POISION_PILL); + } + + @Override + public void accept(T t) throws Exception { + queue.put(t); + } + + @Override + public boolean hasNext() { + if (currentElement == null) { + try { + currentElement = queue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + return currentElement != POISION_PILL; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + T rval = currentElement; + currentElement = null; + return rval; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ac7f9739/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java new file mode 100644 index 0000000..d141570 --- /dev/null +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.stream; + +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.Iterators; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.SynchronousQueue; +import org.apache.beam.fn.harness.stream.DataStreams.BlockingQueueIterator; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DataStreams}. */ +@RunWith(JUnit4.class) +public class DataStreamsTest { + private static final ByteString BYTES_A = ByteString.copyFromUtf8("TestData"); + private static final ByteString BYTES_B = ByteString.copyFromUtf8("SomeOtherTestData"); + + @Test + public void testEmptyRead() throws Exception { + assertEquals(ByteString.EMPTY, read()); + assertEquals(ByteString.EMPTY, read(ByteString.EMPTY)); + assertEquals(ByteString.EMPTY, read(ByteString.EMPTY, ByteString.EMPTY)); + } + + @Test + public void testRead() throws Exception { + assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B)); + assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, ByteString.EMPTY, BYTES_B)); + assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B, ByteString.EMPTY)); + } + + @Test(timeout = 10_000) + public void testBlockingQueueIteratorWithoutBlocking() throws Exception { + BlockingQueueIterator<String> iterator = + new BlockingQueueIterator<>(new ArrayBlockingQueue<>(3)); + + iterator.accept("A"); + iterator.accept("B"); + iterator.close(); + + assertEquals(Arrays.asList("A", "B"), + Arrays.asList(Iterators.toArray(iterator, String.class))); + } + + @Test(timeout = 10_000) + public void testBlockingQueueIteratorWithBlocking() throws Exception { + // The synchronous queue only allows for one element to transfer at a time and blocks + // the sending/receiving parties until both parties are there. + final BlockingQueueIterator<String> iterator = + new BlockingQueueIterator<>(new SynchronousQueue<>()); + final CompletableFuture<List<String>> valuesFuture = new CompletableFuture<>(); + Thread appender = new Thread() { + @Override + public void run() { + valuesFuture.complete(Arrays.asList(Iterators.toArray(iterator, String.class))); + } + }; + appender.start(); + iterator.accept("A"); + iterator.accept("B"); + iterator.close(); + assertEquals(Arrays.asList("A", "B"), valuesFuture.get()); + appender.join(); + } + + private static ByteString read(ByteString... bytes) throws IOException { + return ByteString.readFrom(DataStreams.inbound(Arrays.asList(bytes).iterator())); + } +}
