Repository: incubator-beam Updated Branches: refs/heads/master ac314eefd -> ccbf2b802
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java index a710753..b029dd3 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java @@ -75,6 +75,7 @@ public class TransformExecutorTest { private RegisteringCompletionCallback completionCallback; private TransformExecutorService transformEvaluationState; + private BundleFactory bundleFactory; @Mock private InProcessEvaluationContext evaluationContext; @Mock private TransformEvaluatorRegistry registry; private Map<TransformExecutor<?>, Boolean> scheduled; @@ -83,6 +84,8 @@ public class TransformExecutorTest { public void setup() { MockitoAnnotations.initMocks(this); + bundleFactory = InProcessBundleFactory.create(); + scheduled = new HashMap<>(); transformEvaluationState = TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService(), scheduled); @@ -157,7 +160,7 @@ public class TransformExecutorTest { WindowedValue<String> spam = WindowedValue.valueInGlobalWindow("spam"); WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third"); CommittedBundle<String> inputBundle = - InProcessBundle.unkeyed(created).add(foo).add(spam).add(third).commit(Instant.now()); + bundleFactory.createRootBundle(created).add(foo).add(spam).add(third).commit(Instant.now()); when( registry.<String>forApplication( downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) @@ -203,7 +206,7 @@ public class TransformExecutorTest { WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo"); CommittedBundle<String> inputBundle = - InProcessBundle.unkeyed(created).add(foo).commit(Instant.now()); + bundleFactory.createRootBundle(created).add(foo).commit(Instant.now()); when( registry.<String>forApplication( downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) @@ -241,7 +244,8 @@ public class TransformExecutorTest { } }; - CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(created).commit(Instant.now()); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(created).commit(Instant.now()); when( registry.<String>forApplication( downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java index ce4776d..dfcafaa 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java @@ -72,6 +72,8 @@ public class UnboundedReadEvaluatorFactoryTest { private InProcessEvaluationContext context; private UncommittedBundle<Long> output; + private BundleFactory bundleFactory = InProcessBundleFactory.create(); + @Before public void setup() { UnboundedSource<Long, ?> source = @@ -81,7 +83,7 @@ public class UnboundedReadEvaluatorFactoryTest { factory = new UnboundedReadEvaluatorFactory(); context = mock(InProcessEvaluationContext.class); - output = InProcessBundle.unkeyed(longs); + output = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(output); } @@ -118,7 +120,7 @@ public class UnboundedReadEvaluatorFactoryTest { tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), tgw(0L))); - UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs); + UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(secondOutput); TransformEvaluator<?> secondEvaluator = factory.forApplication(longs.getProducingTransformInternal(), null, context); @@ -141,6 +143,7 @@ public class UnboundedReadEvaluatorFactoryTest { PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); when(context.createRootBundle(pcollection)).thenReturn(output); TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); @@ -158,6 +161,7 @@ public class UnboundedReadEvaluatorFactoryTest { PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); when(context.createRootBundle(pcollection)).thenReturn(output); TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); @@ -175,7 +179,7 @@ public class UnboundedReadEvaluatorFactoryTest { */ @Test public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() throws Exception { - UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs); + UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); TransformEvaluator<?> evaluator = factory.forApplication(longs.getProducingTransformInternal(), null, context); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java index 71abc87..b20e793 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java @@ -50,6 +50,8 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class ViewEvaluatorFactoryTest { + private BundleFactory bundleFactory = InProcessBundleFactory.create(); + @Test public void testInMemoryEvaluator() throws Exception { TestPipeline p = TestPipeline.create(); @@ -70,7 +72,8 @@ public class ViewEvaluatorFactoryTest { TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>(); when(context.createPCollectionViewWriter(concat, view)).thenReturn(viewWriter); - CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); TransformEvaluator<Iterable<String>> evaluator = new ViewEvaluatorFactory() .forApplication(view.getProducingTransformInternal(), inputBundle, context);
