Repository: incubator-beam Updated Branches: refs/heads/master a1ac2222d -> 759b6cada
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index 344fd4b..c63e9bd 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -171,7 +171,7 @@ public class TransformExecutorTest { WindowedValue<String> spam = WindowedValue.valueInGlobalWindow("spam"); WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third"); CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(created).add(foo).add(spam).add(third).commit(Instant.now()); + bundleFactory.createBundle(created).add(foo).add(spam).add(third).commit(Instant.now()); when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle)) .thenReturn(evaluator); @@ -213,7 +213,7 @@ public class TransformExecutorTest { WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo"); CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(created).add(foo).commit(Instant.now()); + bundleFactory.createBundle(created).add(foo).commit(Instant.now()); when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle)) .thenReturn(evaluator); @@ -248,7 +248,7 @@ public class TransformExecutorTest { }; CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(created).commit(Instant.now()); + bundleFactory.createBundle(created).commit(Instant.now()); when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle)) .thenReturn(evaluator); @@ -328,7 +328,7 @@ public class TransformExecutorTest { WindowedValue<String> fooElem = WindowedValue.valueInGlobalWindow("foo"); WindowedValue<String> barElem = WindowedValue.valueInGlobalWindow("bar"); CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(created).add(fooElem).add(barElem).commit(Instant.now()); + bundleFactory.createBundle(created).add(fooElem).add(barElem).commit(Instant.now()); when(registry.forApplication(downstream.getProducingTransformInternal(), inputBundle)) .thenReturn(evaluator); @@ -386,7 +386,7 @@ public class TransformExecutorTest { WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes()); CommittedBundle<byte[]> inputBundle = - bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now()); + bundleFactory.createBundle(pcBytes).add(fooBytes).commit(Instant.now()); when(registry.forApplication(pcBytes.getProducingTransformInternal(), inputBundle)) .thenReturn(evaluator); @@ -442,7 +442,7 @@ public class TransformExecutorTest { WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes()); CommittedBundle<byte[]> inputBundle = - bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now()); + bundleFactory.createBundle(pcBytes).add(fooBytes).commit(Instant.now()); when(registry.forApplication(pcBytes.getProducingTransformInternal(), inputBundle)) .thenReturn(evaluator); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 94c9dd5..77c0bcb 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -83,8 +83,8 @@ public class UnboundedReadEvaluatorFactoryTest { context = mock(EvaluationContext.class); factory = new UnboundedReadEvaluatorFactory(context); - output = bundleFactory.createRootBundle(longs); - when(context.createRootBundle(longs)).thenReturn(output); + output = bundleFactory.createBundle(longs); + when(context.createBundle(longs)).thenReturn(output); } @Test @@ -120,8 +120,8 @@ public class UnboundedReadEvaluatorFactoryTest { tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), tgw(0L))); - UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); - when(context.createRootBundle(longs)).thenReturn(secondOutput); + UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs); + when(context.createBundle(longs)).thenReturn(secondOutput); TransformEvaluator<?> secondEvaluator = factory.forApplication(longs.getProducingTransformInternal(), null); TransformResult secondResult = secondEvaluator.finishBundle(); @@ -148,8 +148,8 @@ public class UnboundedReadEvaluatorFactoryTest { PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); - UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); - when(context.createRootBundle(pcollection)).thenReturn(output); + UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection); + when(context.createBundle(pcollection)).thenReturn(output); TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null); evaluator.finishBundle(); @@ -157,8 +157,8 @@ public class UnboundedReadEvaluatorFactoryTest { output.commit(Instant.now()).getElements(), containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(3L), tgw(0L))); - UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); - when(context.createRootBundle(longs)).thenReturn(secondOutput); + UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs); + when(context.createBundle(longs)).thenReturn(secondOutput); TransformEvaluator<?> secondEvaluator = factory.forApplication(sourceTransform, null); secondEvaluator.finishBundle(); assertThat( @@ -178,8 +178,8 @@ public class UnboundedReadEvaluatorFactoryTest { PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); - UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); - when(context.createRootBundle(pcollection)).thenReturn(output); + UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection); + when(context.createBundle(pcollection)).thenReturn(output); for (int i = 0; i < UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT + 1; i++) { TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null); @@ -197,8 +197,8 @@ public class UnboundedReadEvaluatorFactoryTest { PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); - UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); - when(context.createRootBundle(pcollection)).thenReturn(output); + UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection); + when(context.createBundle(pcollection)).thenReturn(output); TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null); evaluator.finishBundle(); @@ -222,8 +222,8 @@ public class UnboundedReadEvaluatorFactoryTest { PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); - UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); - when(context.createRootBundle(pcollection)).thenReturn(output); + UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection); + when(context.createBundle(pcollection)).thenReturn(output); for (int i = 0; i < 2 * UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT; i++) { TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java index ae904e4..7d14020 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java @@ -72,7 +72,7 @@ public class ViewEvaluatorFactoryTest { when(context.createPCollectionViewWriter(concat, view)).thenReturn(viewWriter); CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(input).commit(Instant.now()); + bundleFactory.createBundle(input).commit(Instant.now()); TransformEvaluator<Iterable<String>> evaluator = new ViewEvaluatorFactory(context) .forApplication(view.getProducingTransformInternal(), inputBundle); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index d9dc404..a722b49 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -276,7 +276,7 @@ public class WatermarkManagerTest implements Serializable { assertThat(withBufferedElements.getOutputWatermark(), equalTo(firstCollectionTimestamp)); CommittedBundle<?> completedFlattenBundle = - bundleFactory.createRootBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); + bundleFactory.createBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(firstPcollectionBundle, TimerUpdate.empty(), result(flattened.getProducingTransformInternal(), @@ -399,14 +399,14 @@ public class WatermarkManagerTest implements Serializable { */ @Test public void updateWatermarkWithKeyedWatermarkHolds() { - CommittedBundle<Integer> firstKeyBundle = bundleFactory.createKeyedBundle(null, + CommittedBundle<Integer> firstKeyBundle = bundleFactory.createKeyedBundle( StructuralKey.of("Odd", StringUtf8Coder.of()), createdInts) .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L))) .add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L))) .commit(clock.now()); - CommittedBundle<Integer> secondKeyBundle = bundleFactory.createKeyedBundle(null, + CommittedBundle<Integer> secondKeyBundle = bundleFactory.createKeyedBundle( StructuralKey.of("Even", StringUtf8Coder.of()), createdInts) .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L))) @@ -439,7 +439,7 @@ public class WatermarkManagerTest implements Serializable { not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE))); assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L)))); - CommittedBundle<Integer> fauxFirstKeyTimerBundle = bundleFactory.createKeyedBundle(null, + CommittedBundle<Integer> fauxFirstKeyTimerBundle = bundleFactory.createKeyedBundle( StructuralKey.of("Odd", StringUtf8Coder.of()), createdInts).commit(clock.now()); manager.updateWatermarks(fauxFirstKeyTimerBundle, @@ -452,7 +452,7 @@ public class WatermarkManagerTest implements Serializable { assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L))); - CommittedBundle<Integer> fauxSecondKeyTimerBundle = bundleFactory.createKeyedBundle(null, + CommittedBundle<Integer> fauxSecondKeyTimerBundle = bundleFactory.createKeyedBundle( StructuralKey.of("Even", StringUtf8Coder.of()), createdInts).commit(clock.now()); manager.updateWatermarks(fauxSecondKeyTimerBundle, @@ -482,7 +482,7 @@ public class WatermarkManagerTest implements Serializable { @Test public void updateOutputWatermarkShouldBeMonotonic() { CommittedBundle<?> firstInput = - bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); + bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), null, @@ -494,7 +494,7 @@ public class WatermarkManagerTest implements Serializable { assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L))); CommittedBundle<?> secondInput = - bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); + bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), @@ -558,7 +558,7 @@ public class WatermarkManagerTest implements Serializable { WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-1000L)); WindowedValue<Integer> third = WindowedValue.timestampedValueInGlobalWindow(3, new Instant(1234L)); - CommittedBundle<Integer> createdBundle = bundleFactory.createRootBundle(createdInts) + CommittedBundle<Integer> createdBundle = bundleFactory.createBundle(createdInts) .add(first) .add(second) .add(third) @@ -657,12 +657,12 @@ public class WatermarkManagerTest implements Serializable { TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), null, Collections.<CommittedBundle<?>>singleton( bundleFactory - .createRootBundle(createdInts) + .createBundle(createdInts) .add(WindowedValue.valueInGlobalWindow(1)) .commit(Instant.now()))), BoundedWindow.TIMESTAMP_MAX_VALUE); - CommittedBundle<Integer> createdBundle = bundleFactory.createRootBundle(createdInts) + CommittedBundle<Integer> createdBundle = bundleFactory.createBundle(createdInts) .add(WindowedValue.valueInGlobalWindow(1)) .commit(Instant.now()); manager.updateWatermarks(createdBundle, @@ -778,7 +778,7 @@ public class WatermarkManagerTest implements Serializable { not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE))); CommittedBundle<Integer> createOutput = - bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L)); + bundleFactory.createBundle(createdInts).commit(new Instant(1250L)); manager.updateWatermarks(null, TimerUpdate.empty(), @@ -810,7 +810,7 @@ public class WatermarkManagerTest implements Serializable { not(laterThan(new Instant(1250L)))); CommittedBundle<?> filterOutputBundle = - bundleFactory.createRootBundle(intsToFlatten).commit(new Instant(1250L)); + bundleFactory.createBundle(intsToFlatten).commit(new Instant(1250L)); manager.updateWatermarks(createOutput, TimerUpdate.empty(), result(filtered.getProducingTransformInternal(), @@ -892,10 +892,10 @@ public class WatermarkManagerTest implements Serializable { CommittedBundle<Integer> filteredTimerBundle = bundleFactory - .createKeyedBundle(null, key, filtered) + .createKeyedBundle(key, filtered) .commit(BoundedWindow.TIMESTAMP_MAX_VALUE); CommittedBundle<Integer> filteredTimerResult = - bundleFactory.createKeyedBundle(null, key, filteredTimesTwo) + bundleFactory.createKeyedBundle(key, filteredTimesTwo) .commit(filteredWms.getSynchronizedProcessingOutputTime()); // Complete the processing time timer manager.updateWatermarks(filteredTimerBundle, @@ -951,7 +951,7 @@ public class WatermarkManagerTest implements Serializable { not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE))); CommittedBundle<Integer> createOutput = - bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L)); + bundleFactory.createBundle(createdInts).commit(new Instant(1250L)); manager.updateWatermarks(null, TimerUpdate.empty(), @@ -967,7 +967,7 @@ public class WatermarkManagerTest implements Serializable { not(laterThan(clock.now()))); CommittedBundle<Integer> createSecondOutput = - bundleFactory.createRootBundle(createdInts).commit(new Instant(750L)); + bundleFactory.createBundle(createdInts).commit(new Instant(750L)); manager.updateWatermarks(null, TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), @@ -1041,7 +1041,7 @@ public class WatermarkManagerTest implements Serializable { new Instant(29_919_235L)); Instant upstreamHold = new Instant(2048L); - CommittedBundle<Integer> filteredBundle = bundleFactory.createKeyedBundle(created, + CommittedBundle<Integer> filteredBundle = bundleFactory.createKeyedBundle( StructuralKey.of("key", StringUtf8Coder.of()), filtered).commit(upstreamHold); manager.updateWatermarks( @@ -1394,7 +1394,7 @@ public class WatermarkManagerTest implements Serializable { @SafeVarargs private final <T> CommittedBundle<T> timestampedBundle( PCollection<T> pc, TimestampedValue<T>... values) { - UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pc); + UncommittedBundle<T> bundle = bundleFactory.createBundle(pc); for (TimestampedValue<T> value : values) { bundle.add( WindowedValue.timestampedValueInGlobalWindow(value.getValue(), value.getTimestamp())); @@ -1404,7 +1404,7 @@ public class WatermarkManagerTest implements Serializable { @SafeVarargs private final <T> CommittedBundle<T> multiWindowedBundle(PCollection<T> pc, T... values) { - UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pc); + UncommittedBundle<T> bundle = bundleFactory.createBundle(pc); Collection<BoundedWindow> windows = ImmutableList.of( GlobalWindow.INSTANCE, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java index 29330df..741f8f2 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java @@ -286,7 +286,7 @@ public class WindowEvaluatorFactoryTest { private CommittedBundle<Long> createInputBundle() { CommittedBundle<Long> inputBundle = bundleFactory - .createRootBundle(input) + .createBundle(input) .add(valueInGlobalWindow) .add(valueInGlobalAndTwoIntervalWindows) .add(valueInIntervalWindow) @@ -296,8 +296,8 @@ public class WindowEvaluatorFactoryTest { private UncommittedBundle<Long> createOutputBundle( PCollection<Long> output, CommittedBundle<Long> inputBundle) { - UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(inputBundle, output); - when(evaluationContext.createBundle(inputBundle, output)).thenReturn(outputBundle); + UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(output); + when(evaluationContext.createBundle(output)).thenReturn(outputBundle); return outputBundle; }
