This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8b7f5eb6b00df10ec240faee162a5247478b5ee2 Author: Romain Manni-Bucau <[email protected]> AuthorDate: Mon Mar 19 18:25:32 2018 +0100 testing npe fix and exception rethrow in unit tests --- .../direct/UnboundedReadEvaluatorFactory.java | 3 ++ .../direct/UnboundedReadEvaluatorFactoryTest.java | 58 ++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index a46d657..4640efd 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -199,6 +199,9 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { } } } + if (ioe != null) { + throw ioe; + } } } } catch (IOException e) { diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index af5313a..f708024 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -18,6 +18,9 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singletonMap; import static org.apache.beam.runners.direct.DirectGraphs.getProducer; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -25,6 +28,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -36,6 +40,7 @@ import com.google.common.collect.ContiguousSet; import com.google.common.collect.DiscreteDomain; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.Range; import java.io.IOException; import java.io.InputStream; @@ -43,16 +48,19 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.NoSuchElementException; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator; import org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.Read; @@ -66,10 +74,13 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.VarInt; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; import org.hamcrest.Matchers; import org.joda.time.DateTime; import org.joda.time.Instant; @@ -402,6 +413,53 @@ public class UnboundedReadEvaluatorFactoryTest { evaluator.processElement(shard); } + @Test // before this was throwing a NPE + public void emptySource() throws Exception { + TestUnboundedSource.readerClosedCount = 0; + final TestUnboundedSource<String> source = new TestUnboundedSource<>(StringUtf8Coder.of()); + source.advanceWatermarkToInfinity = true; + processElement(source); + assertEquals(1, TestUnboundedSource.readerClosedCount); + TestUnboundedSource.readerClosedCount = 0; // reset + } + + @Test(expected = IOException.class) + public void sourceThrowingException() throws Exception { + final TestUnboundedSource<String> source = new TestUnboundedSource<>(StringUtf8Coder.of()); + source.advanceWatermarkToInfinity = true; + source.throwOnClose = true; + processElement(source); + } + + private void processElement(final TestUnboundedSource<String> source) throws Exception { + final DirectOptions options = PipelineOptionsFactory.fromArgs() + .create().as(DirectOptions.class); + final EvaluationContext context = EvaluationContext.create( + options, MockClock.fromInstant(Instant.now()), CloningBundleFactory.create(), + DirectGraph.create(emptyMap(), emptyMap(), + LinkedListMultimap.create(), LinkedListMultimap.create(), emptySet(), emptyMap()), + emptySet()); + final UnboundedReadEvaluatorFactory factory = new UnboundedReadEvaluatorFactory(context); + + final Read.Unbounded<String> unbounded = Read.from(source); + final Pipeline pipeline = Pipeline.create(options); + final PCollection<String> pCollection = pipeline.apply(unbounded); + final AppliedPTransform<PBegin, PCollection<String>, Read.Unbounded<String>> application = + AppliedPTransform.of( + "test", new HashMap<>(), singletonMap(new TupleTag(), pCollection), + unbounded, pipeline); + final TransformEvaluator<UnboundedSourceShard<String, TestCheckpointMark>> evaluator = + factory.forApplication(application, null); + final UnboundedSource.UnboundedReader<String> reader = source.createReader(options, null); + final UnboundedSourceShard<String, TestCheckpointMark> shard = UnboundedSourceShard.of( + source, new NeverDeduplicator(), reader, null); + final WindowedValue<UnboundedSourceShard<String, TestCheckpointMark>> value = + WindowedValue.of( + shard, BoundedWindow.TIMESTAMP_MAX_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + TestUnboundedSource.readerClosedCount = 0; + evaluator.processElement(value); + } + /** * A terse alias for producing timestamped longs in the {@link GlobalWindow}, where * the timestamp is the epoch offset by the value of the element. -- To stop receiving notification emails like this one, please contact [email protected].
