Repository: incubator-beam
Updated Branches:
  refs/heads/master ac314eefd -> ccbf2b802


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
index a710753..b029dd3 100644
--- 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
+++ 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
@@ -75,6 +75,7 @@ public class TransformExecutorTest {
 
   private RegisteringCompletionCallback completionCallback;
   private TransformExecutorService transformEvaluationState;
+  private BundleFactory bundleFactory;
   @Mock private InProcessEvaluationContext evaluationContext;
   @Mock private TransformEvaluatorRegistry registry;
   private Map<TransformExecutor<?>, Boolean> scheduled;
@@ -83,6 +84,8 @@ public class TransformExecutorTest {
   public void setup() {
     MockitoAnnotations.initMocks(this);
 
+    bundleFactory = InProcessBundleFactory.create();
+
     scheduled = new HashMap<>();
     transformEvaluationState =
         
TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService(), 
scheduled);
@@ -157,7 +160,7 @@ public class TransformExecutorTest {
     WindowedValue<String> spam = WindowedValue.valueInGlobalWindow("spam");
     WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third");
     CommittedBundle<String> inputBundle =
-        
InProcessBundle.unkeyed(created).add(foo).add(spam).add(third).commit(Instant.now());
+        
bundleFactory.createRootBundle(created).add(foo).add(spam).add(third).commit(Instant.now());
     when(
             registry.<String>forApplication(
                 downstream.getProducingTransformInternal(), inputBundle, 
evaluationContext))
@@ -203,7 +206,7 @@ public class TransformExecutorTest {
 
     WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo");
     CommittedBundle<String> inputBundle =
-        InProcessBundle.unkeyed(created).add(foo).commit(Instant.now());
+        bundleFactory.createRootBundle(created).add(foo).commit(Instant.now());
     when(
             registry.<String>forApplication(
                 downstream.getProducingTransformInternal(), inputBundle, 
evaluationContext))
@@ -241,7 +244,8 @@ public class TransformExecutorTest {
           }
         };
 
-    CommittedBundle<String> inputBundle = 
InProcessBundle.unkeyed(created).commit(Instant.now());
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(created).commit(Instant.now());
     when(
             registry.<String>forApplication(
                 downstream.getProducingTransformInternal(), inputBundle, 
evaluationContext))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
index ce4776d..dfcafaa 100644
--- 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
@@ -72,6 +72,8 @@ public class UnboundedReadEvaluatorFactoryTest {
   private InProcessEvaluationContext context;
   private UncommittedBundle<Long> output;
 
+  private BundleFactory bundleFactory = InProcessBundleFactory.create();
+
   @Before
   public void setup() {
     UnboundedSource<Long, ?> source =
@@ -81,7 +83,7 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     factory = new UnboundedReadEvaluatorFactory();
     context = mock(InProcessEvaluationContext.class);
-    output = InProcessBundle.unkeyed(longs);
+    output = bundleFactory.createRootBundle(longs);
     when(context.createRootBundle(longs)).thenReturn(output);
   }
 
@@ -118,7 +120,7 @@ public class UnboundedReadEvaluatorFactoryTest {
             tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), 
tgw(5L), tgw(3L),
             tgw(0L)));
 
-    UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs);
+    UncommittedBundle<Long> secondOutput = 
bundleFactory.createRootBundle(longs);
     when(context.createRootBundle(longs)).thenReturn(secondOutput);
     TransformEvaluator<?> secondEvaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
@@ -141,6 +143,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
 
+    UncommittedBundle<Long> output = 
bundleFactory.createRootBundle(pcollection);
     when(context.createRootBundle(pcollection)).thenReturn(output);
 
     TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null, context);
@@ -158,6 +161,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
 
+    UncommittedBundle<Long> output = 
bundleFactory.createRootBundle(pcollection);
     when(context.createRootBundle(pcollection)).thenReturn(output);
 
     TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null, context);
@@ -175,7 +179,7 @@ public class UnboundedReadEvaluatorFactoryTest {
    */
   @Test
   public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() 
throws Exception {
-    UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs);
+    UncommittedBundle<Long> secondOutput = 
bundleFactory.createRootBundle(longs);
 
     TransformEvaluator<?> evaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null, 
context);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
index 71abc87..b20e793 100644
--- 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
@@ -50,6 +50,8 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class ViewEvaluatorFactoryTest {
+  private BundleFactory bundleFactory = InProcessBundleFactory.create();
+
   @Test
   public void testInMemoryEvaluator() throws Exception {
     TestPipeline p = TestPipeline.create();
@@ -70,7 +72,8 @@ public class ViewEvaluatorFactoryTest {
     TestViewWriter<String, Iterable<String>> viewWriter = new 
TestViewWriter<>();
     when(context.createPCollectionViewWriter(concat, 
view)).thenReturn(viewWriter);
 
-    CommittedBundle<String> inputBundle = 
InProcessBundle.unkeyed(input).commit(Instant.now());
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
     TransformEvaluator<Iterable<String>> evaluator =
         new ViewEvaluatorFactory()
             .forApplication(view.getProducingTransformInternal(), inputBundle, 
context);

Reply via email to