This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit cba4cac8e57aaf3bed5bb7b37c744af3d9b3d5d1
Author: Luke Cwik <[email protected]>
AuthorDate: Thu Jun 7 13:15:51 2018 -0700

    Update https://github.com/apache/beam/pull/4894 to correspond with changes 
to HEAD branch.
---
 .../direct/UnboundedReadEvaluatorFactoryTest.java       | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index f708024..fe47e77 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -52,6 +52,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.concurrent.Executors;
 import javax.annotation.Nullable;
 import 
org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
 import 
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard;
@@ -114,6 +115,7 @@ public class UnboundedReadEvaluatorFactoryTest {
   public void setup() {
     source = CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
     longs = p.apply(Read.from(source));
+    options = PipelineOptionsFactory.create();
 
     context = mock(EvaluationContext.class);
     factory = new UnboundedReadEvaluatorFactory(context, options);
@@ -432,14 +434,15 @@ public class UnboundedReadEvaluatorFactoryTest {
   }
 
   private void processElement(final TestUnboundedSource<String> source) throws 
Exception {
-    final DirectOptions options = PipelineOptionsFactory.fromArgs()
-                                                        
.create().as(DirectOptions.class);
     final EvaluationContext context = EvaluationContext.create(
-      options, MockClock.fromInstant(Instant.now()), 
CloningBundleFactory.create(),
-        DirectGraph.create(emptyMap(), emptyMap(),
-        LinkedListMultimap.create(), LinkedListMultimap.create(), emptySet(), 
emptyMap()),
-      emptySet());
-    final UnboundedReadEvaluatorFactory factory = new 
UnboundedReadEvaluatorFactory(context);
+        MockClock.fromInstant(Instant.now()),
+        CloningBundleFactory.create(),
+        DirectGraph.create(
+            emptyMap(), emptyMap(), LinkedListMultimap.create(), emptySet(), 
emptyMap()),
+        emptySet(),
+        Executors.newCachedThreadPool());
+    final UnboundedReadEvaluatorFactory factory =
+        new UnboundedReadEvaluatorFactory(context, options);
 
     final Read.Unbounded<String> unbounded = Read.from(source);
     final Pipeline pipeline = Pipeline.create(options);

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to