Repository: incubator-beam
Updated Branches:
  refs/heads/master a1ac2222d -> 759b6cada


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 344fd4b..c63e9bd 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -171,7 +171,7 @@ public class TransformExecutorTest {
     WindowedValue<String> spam = WindowedValue.valueInGlobalWindow("spam");
     WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third");
     CommittedBundle<String> inputBundle =
-        
bundleFactory.createRootBundle(created).add(foo).add(spam).add(third).commit(Instant.now());
+        
bundleFactory.createBundle(created).add(foo).add(spam).add(third).commit(Instant.now());
     
when(registry.<String>forApplication(downstream.getProducingTransformInternal(),
 inputBundle))
         .thenReturn(evaluator);
 
@@ -213,7 +213,7 @@ public class TransformExecutorTest {
 
     WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo");
     CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(created).add(foo).commit(Instant.now());
+        bundleFactory.createBundle(created).add(foo).commit(Instant.now());
     
when(registry.<String>forApplication(downstream.getProducingTransformInternal(),
 inputBundle))
         .thenReturn(evaluator);
 
@@ -248,7 +248,7 @@ public class TransformExecutorTest {
         };
 
     CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(created).commit(Instant.now());
+        bundleFactory.createBundle(created).commit(Instant.now());
     
when(registry.<String>forApplication(downstream.getProducingTransformInternal(),
 inputBundle))
         .thenReturn(evaluator);
 
@@ -328,7 +328,7 @@ public class TransformExecutorTest {
     WindowedValue<String> fooElem = WindowedValue.valueInGlobalWindow("foo");
     WindowedValue<String> barElem = WindowedValue.valueInGlobalWindow("bar");
     CommittedBundle<String> inputBundle =
-        
bundleFactory.createRootBundle(created).add(fooElem).add(barElem).commit(Instant.now());
+        
bundleFactory.createBundle(created).add(fooElem).add(barElem).commit(Instant.now());
     when(registry.forApplication(downstream.getProducingTransformInternal(), 
inputBundle))
         .thenReturn(evaluator);
 
@@ -386,7 +386,7 @@ public class TransformExecutorTest {
 
     WindowedValue<byte[]> fooBytes = 
WindowedValue.valueInGlobalWindow("foo".getBytes());
     CommittedBundle<byte[]> inputBundle =
-        
bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now());
+        
bundleFactory.createBundle(pcBytes).add(fooBytes).commit(Instant.now());
     when(registry.forApplication(pcBytes.getProducingTransformInternal(), 
inputBundle))
         .thenReturn(evaluator);
 
@@ -442,7 +442,7 @@ public class TransformExecutorTest {
 
     WindowedValue<byte[]> fooBytes = 
WindowedValue.valueInGlobalWindow("foo".getBytes());
     CommittedBundle<byte[]> inputBundle =
-        
bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now());
+        
bundleFactory.createBundle(pcBytes).add(fooBytes).commit(Instant.now());
     when(registry.forApplication(pcBytes.getProducingTransformInternal(), 
inputBundle))
         .thenReturn(evaluator);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 94c9dd5..77c0bcb 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -83,8 +83,8 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     context = mock(EvaluationContext.class);
     factory = new UnboundedReadEvaluatorFactory(context);
-    output = bundleFactory.createRootBundle(longs);
-    when(context.createRootBundle(longs)).thenReturn(output);
+    output = bundleFactory.createBundle(longs);
+    when(context.createBundle(longs)).thenReturn(output);
   }
 
   @Test
@@ -120,8 +120,8 @@ public class UnboundedReadEvaluatorFactoryTest {
             tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), 
tgw(5L), tgw(3L),
             tgw(0L)));
 
-    UncommittedBundle<Long> secondOutput = 
bundleFactory.createRootBundle(longs);
-    when(context.createRootBundle(longs)).thenReturn(secondOutput);
+    UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs);
+    when(context.createBundle(longs)).thenReturn(secondOutput);
     TransformEvaluator<?> secondEvaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null);
     TransformResult secondResult = secondEvaluator.finishBundle();
@@ -148,8 +148,8 @@ public class UnboundedReadEvaluatorFactoryTest {
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
 
-    UncommittedBundle<Long> output = 
bundleFactory.createRootBundle(pcollection);
-    when(context.createRootBundle(pcollection)).thenReturn(output);
+    UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
+    when(context.createBundle(pcollection)).thenReturn(output);
     TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null);
 
     evaluator.finishBundle();
@@ -157,8 +157,8 @@ public class UnboundedReadEvaluatorFactoryTest {
         output.commit(Instant.now()).getElements(),
         containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(3L), tgw(0L)));
 
-    UncommittedBundle<Long> secondOutput = 
bundleFactory.createRootBundle(longs);
-    when(context.createRootBundle(longs)).thenReturn(secondOutput);
+    UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs);
+    when(context.createBundle(longs)).thenReturn(secondOutput);
     TransformEvaluator<?> secondEvaluator = 
factory.forApplication(sourceTransform, null);
     secondEvaluator.finishBundle();
     assertThat(
@@ -178,8 +178,8 @@ public class UnboundedReadEvaluatorFactoryTest {
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
 
-    UncommittedBundle<Long> output = 
bundleFactory.createRootBundle(pcollection);
-    when(context.createRootBundle(pcollection)).thenReturn(output);
+    UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
+    when(context.createBundle(pcollection)).thenReturn(output);
 
     for (int i = 0; i < UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT + 
1; i++) {
       TransformEvaluator<?> evaluator = 
factory.forApplication(sourceTransform, null);
@@ -197,8 +197,8 @@ public class UnboundedReadEvaluatorFactoryTest {
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
 
-    UncommittedBundle<Long> output = 
bundleFactory.createRootBundle(pcollection);
-    when(context.createRootBundle(pcollection)).thenReturn(output);
+    UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
+    when(context.createBundle(pcollection)).thenReturn(output);
 
     TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null);
     evaluator.finishBundle();
@@ -222,8 +222,8 @@ public class UnboundedReadEvaluatorFactoryTest {
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
 
-    UncommittedBundle<Long> output = 
bundleFactory.createRootBundle(pcollection);
-    when(context.createRootBundle(pcollection)).thenReturn(output);
+    UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
+    when(context.createBundle(pcollection)).thenReturn(output);
 
     for (int i = 0; i < 2 * 
UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT; i++) {
       TransformEvaluator<?> evaluator = 
factory.forApplication(sourceTransform, null);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index ae904e4..7d14020 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -72,7 +72,7 @@ public class ViewEvaluatorFactoryTest {
     when(context.createPCollectionViewWriter(concat, 
view)).thenReturn(viewWriter);
 
     CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
+        bundleFactory.createBundle(input).commit(Instant.now());
     TransformEvaluator<Iterable<String>> evaluator =
         new ViewEvaluatorFactory(context)
             .forApplication(view.getProducingTransformInternal(), inputBundle);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index d9dc404..a722b49 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -276,7 +276,7 @@ public class WatermarkManagerTest implements Serializable {
     assertThat(withBufferedElements.getOutputWatermark(), 
equalTo(firstCollectionTimestamp));
 
     CommittedBundle<?> completedFlattenBundle =
-        
bundleFactory.createRootBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        
bundleFactory.createBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.updateWatermarks(firstPcollectionBundle,
         TimerUpdate.empty(),
         result(flattened.getProducingTransformInternal(),
@@ -399,14 +399,14 @@ public class WatermarkManagerTest implements Serializable 
{
    */
   @Test
   public void updateWatermarkWithKeyedWatermarkHolds() {
-    CommittedBundle<Integer> firstKeyBundle = 
bundleFactory.createKeyedBundle(null,
+    CommittedBundle<Integer> firstKeyBundle = bundleFactory.createKeyedBundle(
         StructuralKey.of("Odd", StringUtf8Coder.of()),
         createdInts)
         .add(WindowedValue.timestampedValueInGlobalWindow(1, new 
Instant(1_000_000L)))
         .add(WindowedValue.timestampedValueInGlobalWindow(3, new 
Instant(-1000L)))
         .commit(clock.now());
 
-    CommittedBundle<Integer> secondKeyBundle = 
bundleFactory.createKeyedBundle(null,
+    CommittedBundle<Integer> secondKeyBundle = bundleFactory.createKeyedBundle(
         StructuralKey.of("Even", StringUtf8Coder.of()),
         createdInts)
         .add(WindowedValue.timestampedValueInGlobalWindow(2, new 
Instant(1234L)))
@@ -439,7 +439,7 @@ public class WatermarkManagerTest implements Serializable {
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
     assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new 
Instant(-1000L))));
 
-    CommittedBundle<Integer> fauxFirstKeyTimerBundle = 
bundleFactory.createKeyedBundle(null,
+    CommittedBundle<Integer> fauxFirstKeyTimerBundle = 
bundleFactory.createKeyedBundle(
         StructuralKey.of("Odd", StringUtf8Coder.of()),
         createdInts).commit(clock.now());
     manager.updateWatermarks(fauxFirstKeyTimerBundle,
@@ -452,7 +452,7 @@ public class WatermarkManagerTest implements Serializable {
 
     assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new 
Instant(1234L)));
 
-    CommittedBundle<Integer> fauxSecondKeyTimerBundle = 
bundleFactory.createKeyedBundle(null,
+    CommittedBundle<Integer> fauxSecondKeyTimerBundle = 
bundleFactory.createKeyedBundle(
         StructuralKey.of("Even", StringUtf8Coder.of()),
         createdInts).commit(clock.now());
     manager.updateWatermarks(fauxSecondKeyTimerBundle,
@@ -482,7 +482,7 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void updateOutputWatermarkShouldBeMonotonic() {
     CommittedBundle<?> firstInput =
-        
bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        
bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.updateWatermarks(null,  TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
             null,
@@ -494,7 +494,7 @@ public class WatermarkManagerTest implements Serializable {
     assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L)));
 
     CommittedBundle<?> secondInput =
-        
bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        
bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
@@ -558,7 +558,7 @@ public class WatermarkManagerTest implements Serializable {
         WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-1000L));
     WindowedValue<Integer> third =
         WindowedValue.timestampedValueInGlobalWindow(3, new Instant(1234L));
-    CommittedBundle<Integer> createdBundle = 
bundleFactory.createRootBundle(createdInts)
+    CommittedBundle<Integer> createdBundle = 
bundleFactory.createBundle(createdInts)
         .add(first)
         .add(second)
         .add(third)
@@ -657,12 +657,12 @@ public class WatermarkManagerTest implements Serializable 
{
         TimerUpdate.empty(), 
result(createdInts.getProducingTransformInternal(), null,
         Collections.<CommittedBundle<?>>singleton(
             bundleFactory
-                .createRootBundle(createdInts)
+                .createBundle(createdInts)
                 .add(WindowedValue.valueInGlobalWindow(1))
                 .commit(Instant.now()))),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
-    CommittedBundle<Integer> createdBundle = 
bundleFactory.createRootBundle(createdInts)
+    CommittedBundle<Integer> createdBundle = 
bundleFactory.createBundle(createdInts)
         .add(WindowedValue.valueInGlobalWindow(1))
         .commit(Instant.now());
     manager.updateWatermarks(createdBundle,
@@ -778,7 +778,7 @@ public class WatermarkManagerTest implements Serializable {
         not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
 
     CommittedBundle<Integer> createOutput =
-        bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L));
+        bundleFactory.createBundle(createdInts).commit(new Instant(1250L));
 
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
@@ -810,7 +810,7 @@ public class WatermarkManagerTest implements Serializable {
         not(laterThan(new Instant(1250L))));
 
     CommittedBundle<?> filterOutputBundle =
-        bundleFactory.createRootBundle(intsToFlatten).commit(new 
Instant(1250L));
+        bundleFactory.createBundle(intsToFlatten).commit(new Instant(1250L));
     manager.updateWatermarks(createOutput,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
@@ -892,10 +892,10 @@ public class WatermarkManagerTest implements Serializable 
{
 
     CommittedBundle<Integer> filteredTimerBundle =
         bundleFactory
-            .createKeyedBundle(null, key, filtered)
+            .createKeyedBundle(key, filtered)
             .commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     CommittedBundle<Integer> filteredTimerResult =
-        bundleFactory.createKeyedBundle(null, key, filteredTimesTwo)
+        bundleFactory.createKeyedBundle(key, filteredTimesTwo)
             .commit(filteredWms.getSynchronizedProcessingOutputTime());
     // Complete the processing time timer
     manager.updateWatermarks(filteredTimerBundle,
@@ -951,7 +951,7 @@ public class WatermarkManagerTest implements Serializable {
         not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
 
     CommittedBundle<Integer> createOutput =
-        bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L));
+        bundleFactory.createBundle(createdInts).commit(new Instant(1250L));
 
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
@@ -967,7 +967,7 @@ public class WatermarkManagerTest implements Serializable {
         not(laterThan(clock.now())));
 
     CommittedBundle<Integer> createSecondOutput =
-        bundleFactory.createRootBundle(createdInts).commit(new Instant(750L));
+        bundleFactory.createBundle(createdInts).commit(new Instant(750L));
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
@@ -1041,7 +1041,7 @@ public class WatermarkManagerTest implements Serializable 
{
         new Instant(29_919_235L));
 
     Instant upstreamHold = new Instant(2048L);
-    CommittedBundle<Integer> filteredBundle = 
bundleFactory.createKeyedBundle(created,
+    CommittedBundle<Integer> filteredBundle = bundleFactory.createKeyedBundle(
         StructuralKey.of("key", StringUtf8Coder.of()),
         filtered).commit(upstreamHold);
     manager.updateWatermarks(
@@ -1394,7 +1394,7 @@ public class WatermarkManagerTest implements Serializable 
{
   @SafeVarargs
   private final <T> CommittedBundle<T> timestampedBundle(
       PCollection<T> pc, TimestampedValue<T>... values) {
-    UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pc);
+    UncommittedBundle<T> bundle = bundleFactory.createBundle(pc);
     for (TimestampedValue<T> value : values) {
       bundle.add(
           WindowedValue.timestampedValueInGlobalWindow(value.getValue(), 
value.getTimestamp()));
@@ -1404,7 +1404,7 @@ public class WatermarkManagerTest implements Serializable 
{
 
   @SafeVarargs
   private final <T> CommittedBundle<T> multiWindowedBundle(PCollection<T> pc, 
T... values) {
-    UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pc);
+    UncommittedBundle<T> bundle = bundleFactory.createBundle(pc);
     Collection<BoundedWindow> windows =
         ImmutableList.of(
             GlobalWindow.INSTANCE,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index 29330df..741f8f2 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -286,7 +286,7 @@ public class WindowEvaluatorFactoryTest {
   private CommittedBundle<Long> createInputBundle() {
     CommittedBundle<Long> inputBundle =
         bundleFactory
-            .createRootBundle(input)
+            .createBundle(input)
             .add(valueInGlobalWindow)
             .add(valueInGlobalAndTwoIntervalWindows)
             .add(valueInIntervalWindow)
@@ -296,8 +296,8 @@ public class WindowEvaluatorFactoryTest {
 
   private UncommittedBundle<Long> createOutputBundle(
       PCollection<Long> output, CommittedBundle<Long> inputBundle) {
-    UncommittedBundle<Long> outputBundle = 
bundleFactory.createBundle(inputBundle, output);
-    when(evaluationContext.createBundle(inputBundle, 
output)).thenReturn(outputBundle);
+    UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(output);
+    when(evaluationContext.createBundle(output)).thenReturn(outputBundle);
     return outputBundle;
   }
 

Reply via email to