Do not produce Unprocessed Inputs if all inputs were Processed This stops the WatermarkManager "Pending Bundles" from growing without bound.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8a850af3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8a850af3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8a850af3 Branch: refs/heads/master Commit: 8a850af3304a48618739dc23e286800dc0c4641a Parents: eebff90 Author: Thomas Groh <[email protected]> Authored: Tue Jun 13 12:50:58 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Wed Jun 14 10:21:02 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/CommittedResult.java | 12 ++++----- .../beam/runners/direct/EvaluationContext.java | 26 +++++++++++++++----- .../direct/ExecutorServiceParallelExecutor.java | 9 ++++--- .../beam/runners/direct/WatermarkManager.java | 4 +-- .../runners/direct/CommittedResultTest.java | 17 +++++++------ .../runners/direct/TransformExecutorTest.java | 11 +++++++-- .../runners/direct/WatermarkManagerTest.java | 15 ++++++++--- 7 files changed, 62 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java index 8c45449..70e3ac3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java @@ -19,8 +19,8 @@ package org.apache.beam.runners.direct; import com.google.auto.value.AutoValue; +import com.google.common.base.Optional; import java.util.Set; -import javax.annotation.Nullable; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; @@ -36,12 +36,10 @@ abstract class CommittedResult { /** * Returns the {@link CommittedBundle} that contains the input elements that could not be - * processed by the evaluation. - * - * <p>{@code null} if the input bundle was null. + * processed by the evaluation. The returned optional is present if there were any unprocessed + * input elements, and absent otherwise. */ - @Nullable - public abstract CommittedBundle<?> getUnprocessedInputs(); + public abstract Optional<? extends CommittedBundle<?>> getUnprocessedInputs(); /** * Returns the outputs produced by the transform. @@ -59,7 +57,7 @@ abstract class CommittedResult { public static CommittedResult create( TransformResult<?> original, - CommittedBundle<?> unprocessedElements, + Optional<? extends CommittedBundle<?>> unprocessedElements, Iterable<? extends CommittedBundle<?>> outputs, Set<OutputType> producedOutputs) { return new AutoValue_CommittedResult(original.getTransform(), http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index e215070..d192785 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; @@ -158,12 +159,9 @@ class EvaluationContext { } else { outputTypes.add(OutputType.BUNDLE); } - CommittedResult committedResult = CommittedResult.create(result, - completedBundle == null - ? null - : completedBundle.withElements((Iterable) result.getUnprocessedElements()), - committedBundles, - outputTypes); + CommittedResult committedResult = + CommittedResult.create( + result, getUnprocessedInput(completedBundle, result), committedBundles, outputTypes); // Update state internals CopyOnAccessInMemoryStateInternals theirState = result.getState(); if (theirState != null) { @@ -187,6 +185,22 @@ class EvaluationContext { return committedResult; } + /** + * Returns an {@link Optional} containing a bundle which contains all of the unprocessed elements + * that were not processed from the {@code completedBundle}. If all of the elements of the {@code + * completedBundle} were processed, or if {@code completedBundle} is null, returns an absent + * {@link Optional}. + */ + private Optional<? extends CommittedBundle<?>> getUnprocessedInput( + @Nullable CommittedBundle<?> completedBundle, TransformResult<?> result) { + if (completedBundle == null || Iterables.isEmpty(result.getUnprocessedElements())) { + return Optional.absent(); + } + CommittedBundle<?> residual = + completedBundle.withElements((Iterable) result.getUnprocessedElements()); + return Optional.of(residual); + } + private Iterable<? extends CommittedBundle<?>> commitBundles( Iterable<? extends UncommittedBundle<?>> bundles) { ImmutableList.Builder<CommittedBundle<?>> completed = ImmutableList.builder(); http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 6fe8ebd..2f4d1f6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -357,15 +357,16 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { ExecutorUpdate.fromBundle( outputBundle, graph.getPerElementConsumers(outputBundle.getPCollection()))); } - CommittedBundle<?> unprocessedInputs = committedResult.getUnprocessedInputs(); - if (unprocessedInputs != null && !Iterables.isEmpty(unprocessedInputs.getElements())) { + Optional<? extends CommittedBundle<?>> unprocessedInputs = + committedResult.getUnprocessedInputs(); + if (unprocessedInputs.isPresent()) { if (inputBundle.getPCollection() == null) { // TODO: Split this logic out of an if statement - pendingRootBundles.get(result.getTransform()).offer(unprocessedInputs); + pendingRootBundles.get(result.getTransform()).offer(unprocessedInputs.get()); } else { allUpdates.offer( ExecutorUpdate.fromBundle( - unprocessedInputs, + unprocessedInputs.get(), Collections.<AppliedPTransform<?, ?, ?>>singleton( committedResult.getTransform()))); } http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index 80a3504..599b74f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -994,9 +994,9 @@ class WatermarkManager { } TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform()); - if (input != null) { + if (result.getUnprocessedInputs().isPresent()) { // Add the unprocessed inputs - completedTransform.addPending(result.getUnprocessedInputs()); + completedTransform.addPending(result.getUnprocessedInputs().get()); } completedTransform.updateTimers(timerUpdate); if (input != null) { http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java index cf19dc2..8b95b34 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java @@ -18,9 +18,9 @@ package org.apache.beam.runners.direct; -import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import java.io.Serializable; import java.util.Collections; @@ -72,7 +72,7 @@ public class CommittedResultTest implements Serializable { CommittedResult result = CommittedResult.create( StepTransformResult.withoutHold(transform).build(), - bundleFactory.createBundle(created).commit(Instant.now()), + Optional.<CommittedBundle<?>>absent(), Collections.<CommittedBundle<?>>emptyList(), EnumSet.noneOf(OutputType.class)); @@ -88,11 +88,11 @@ public class CommittedResultTest implements Serializable { CommittedResult result = CommittedResult.create( StepTransformResult.withoutHold(transform).build(), - bundle, + Optional.of(bundle), Collections.<CommittedBundle<?>>emptyList(), EnumSet.noneOf(OutputType.class)); - assertThat(result.getUnprocessedInputs(), + assertThat(result.getUnprocessedInputs().get(), Matchers.<CommittedBundle<?>>equalTo(bundle)); } @@ -101,11 +101,14 @@ public class CommittedResultTest implements Serializable { CommittedResult result = CommittedResult.create( StepTransformResult.withoutHold(transform).build(), - null, + Optional.<CommittedBundle<?>>absent(), Collections.<CommittedBundle<?>>emptyList(), EnumSet.noneOf(OutputType.class)); - assertThat(result.getUnprocessedInputs(), nullValue()); + assertThat( + result.getUnprocessedInputs(), + Matchers.<Optional<? extends CommittedBundle<?>>>equalTo( + Optional.<CommittedBundle<?>>absent())); } @Test @@ -120,7 +123,7 @@ public class CommittedResultTest implements Serializable { CommittedResult result = CommittedResult.create( StepTransformResult.withoutHold(transform).build(), - bundleFactory.createBundle(created).commit(Instant.now()), + Optional.<CommittedBundle<?>>absent(), outputs, EnumSet.of(OutputType.BUNDLE, OutputType.PCOLLECTION_VIEW)); http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index 3dd4028..b7f5a7c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -25,6 +25,8 @@ import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.Collection; @@ -415,8 +417,13 @@ public class TransformExecutorTest { ? Collections.emptyList() : result.getUnprocessedElements(); - CommittedBundle<?> unprocessedBundle = - inputBundle == null ? null : inputBundle.withElements(unprocessedElements); + Optional<? extends CommittedBundle<?>> unprocessedBundle; + if (inputBundle == null || Iterables.isEmpty(unprocessedElements)) { + unprocessedBundle = Optional.absent(); + } else { + unprocessedBundle = + Optional.<CommittedBundle<?>>of(inputBundle.withElements(unprocessedElements)); + } return CommittedResult.create( result, unprocessedBundle, http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index e0b5251..e3f6215 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -318,7 +319,7 @@ public class WatermarkManagerTest implements Serializable { TimerUpdate.empty(), CommittedResult.create( StepTransformResult.withoutHold(graph.getProducer(created)).build(), - root.withElements(Collections.<WindowedValue<Void>>emptyList()), + Optional.<CommittedBundle<?>>absent(), Collections.singleton(createBundle), EnumSet.allOf(OutputType.class)), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -332,7 +333,7 @@ public class WatermarkManagerTest implements Serializable { TimerUpdate.empty(), CommittedResult.create( StepTransformResult.withoutHold(theFlatten).build(), - createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), + Optional.<CommittedBundle<?>>absent(), Collections.<CommittedBundle<?>>emptyList(), EnumSet.allOf(OutputType.class)), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -345,7 +346,7 @@ public class WatermarkManagerTest implements Serializable { TimerUpdate.empty(), CommittedResult.create( StepTransformResult.withoutHold(theFlatten).build(), - createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), + Optional.<CommittedBundle<?>>absent(), Collections.<CommittedBundle<?>>emptyList(), EnumSet.allOf(OutputType.class)), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -1501,9 +1502,15 @@ public class WatermarkManagerTest implements Serializable { AppliedPTransform<?, ?, ?> transform, @Nullable CommittedBundle<?> unprocessedBundle, Iterable<? extends CommittedBundle<?>> bundles) { + Optional<? extends CommittedBundle<?>> unprocessedElements; + if (unprocessedBundle == null || Iterables.isEmpty(unprocessedBundle.getElements())) { + unprocessedElements = Optional.absent(); + } else { + unprocessedElements = Optional.of(unprocessedBundle); + } return CommittedResult.create( StepTransformResult.withoutHold(transform).build(), - unprocessedBundle, + unprocessedElements, bundles, Iterables.isEmpty(bundles) ? EnumSet.noneOf(OutputType.class)
