This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8b7f5eb6b00df10ec240faee162a5247478b5ee2
Author: Romain Manni-Bucau <[email protected]>
AuthorDate: Mon Mar 19 18:25:32 2018 +0100

    testing npe fix and exception rethrow in unit tests
---
 .../direct/UnboundedReadEvaluatorFactory.java      |  3 ++
 .../direct/UnboundedReadEvaluatorFactoryTest.java  | 58 ++++++++++++++++++++++
 2 files changed, 61 insertions(+)

diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index a46d657..4640efd 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -199,6 +199,9 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
                 }
               }
             }
+            if (ioe != null) {
+              throw ioe;
+            }
           }
         }
       } catch (IOException e) {
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index af5313a..f708024 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -18,6 +18,9 @@
 package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonMap;
 import static org.apache.beam.runners.direct.DirectGraphs.getProducer;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
@@ -25,6 +28,7 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -36,6 +40,7 @@ import com.google.common.collect.ContiguousSet;
 import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Range;
 import java.io.IOException;
 import java.io.InputStream;
@@ -43,16 +48,19 @@ import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import javax.annotation.Nullable;
 import 
org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
 import 
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
@@ -66,10 +74,13 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
 import org.hamcrest.Matchers;
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
@@ -402,6 +413,53 @@ public class UnboundedReadEvaluatorFactoryTest {
     evaluator.processElement(shard);
   }
 
+  @Test // before this was throwing a NPE
+  public void emptySource() throws Exception {
+    TestUnboundedSource.readerClosedCount = 0;
+    final TestUnboundedSource<String> source = new 
TestUnboundedSource<>(StringUtf8Coder.of());
+    source.advanceWatermarkToInfinity = true;
+    processElement(source);
+    assertEquals(1, TestUnboundedSource.readerClosedCount);
+    TestUnboundedSource.readerClosedCount = 0; // reset
+  }
+
+  @Test(expected = IOException.class)
+  public void sourceThrowingException() throws Exception {
+    final TestUnboundedSource<String> source = new 
TestUnboundedSource<>(StringUtf8Coder.of());
+    source.advanceWatermarkToInfinity = true;
+    source.throwOnClose = true;
+    processElement(source);
+  }
+
+  private void processElement(final TestUnboundedSource<String> source) throws 
Exception {
+    final DirectOptions options = PipelineOptionsFactory.fromArgs()
+                                                        
.create().as(DirectOptions.class);
+    final EvaluationContext context = EvaluationContext.create(
+      options, MockClock.fromInstant(Instant.now()), 
CloningBundleFactory.create(),
+        DirectGraph.create(emptyMap(), emptyMap(),
+        LinkedListMultimap.create(), LinkedListMultimap.create(), emptySet(), 
emptyMap()),
+      emptySet());
+    final UnboundedReadEvaluatorFactory factory = new 
UnboundedReadEvaluatorFactory(context);
+
+    final Read.Unbounded<String> unbounded = Read.from(source);
+    final Pipeline pipeline = Pipeline.create(options);
+    final PCollection<String> pCollection = pipeline.apply(unbounded);
+    final AppliedPTransform<PBegin, PCollection<String>, 
Read.Unbounded<String>> application =
+      AppliedPTransform.of(
+        "test", new HashMap<>(), singletonMap(new TupleTag(), pCollection),
+              unbounded, pipeline);
+    final TransformEvaluator<UnboundedSourceShard<String, TestCheckpointMark>> 
evaluator =
+      factory.forApplication(application, null);
+    final UnboundedSource.UnboundedReader<String> reader = 
source.createReader(options, null);
+    final UnboundedSourceShard<String, TestCheckpointMark> shard = 
UnboundedSourceShard.of(
+      source, new NeverDeduplicator(), reader, null);
+    final WindowedValue<UnboundedSourceShard<String, TestCheckpointMark>> 
value =
+      WindowedValue.of(
+        shard, BoundedWindow.TIMESTAMP_MAX_VALUE, GlobalWindow.INSTANCE, 
PaneInfo.NO_FIRING);
+    TestUnboundedSource.readerClosedCount = 0;
+    evaluator.processElement(value);
+  }
+
   /**
    * A terse alias for producing timestamped longs in the {@link 
GlobalWindow}, where
    * the timestamp is the epoch offset by the value of the element.

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to