Repository: incubator-beam Updated Branches: refs/heads/master e3105c8e1 -> 6914f2a1d
Refactor CommittedBundle in InProcessBundleFactory Move to a static nested class. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4e7d2e4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4e7d2e4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4e7d2e4 Branch: refs/heads/master Commit: e4e7d2e4fb9da7248df7cde3877e3e6b88467dae Parents: e3105c8 Author: Thomas Groh <tg...@google.com> Authored: Mon Apr 25 09:08:09 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Wed Apr 27 10:13:15 2016 -0700 ---------------------------------------------------------------------- .../inprocess/InProcessBundleFactory.java | 82 ++++++++++++-------- 1 file changed, 50 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4e7d2e4/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java index e39d02e..788fde1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java @@ -100,38 +100,56 @@ class InProcessBundleFactory implements BundleFactory { checkState(!committed, "Can't commit already committed bundle %s", this); committed = true; final Iterable<WindowedValue<T>> committedElements = elements.build(); - return new CommittedBundle<T>() { - @Override - @Nullable - public Object getKey() { - return key; - } - - @Override - public Iterable<WindowedValue<T>> getElements() { - return committedElements; - } - - @Override - public PCollection<T> getPCollection() { - return pcollection; - } - - @Override - public Instant getSynchronizedProcessingOutputWatermark() { - return synchronizedCompletionTime; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .omitNullValues() - .add("pcollection", pcollection) - .add("key", key) - .add("elements", committedElements) - .toString(); - } - }; + return new CommittedInProcessBundle<>( + pcollection, key, committedElements, synchronizedCompletionTime); + } + } + private static class CommittedInProcessBundle<T> implements CommittedBundle<T> { + public CommittedInProcessBundle( + PCollection<T> pcollection, + Object key, + Iterable<WindowedValue<T>> committedElements, + Instant synchronizedCompletionTime) { + this.pcollection = pcollection; + this.key = key; + this.committedElements = committedElements; + this.synchronizedCompletionTime = synchronizedCompletionTime; + } + + private final PCollection<T> pcollection; + private final Object key; + private final Iterable<WindowedValue<T>> committedElements; + private final Instant synchronizedCompletionTime; + + @Override + @Nullable + public Object getKey() { + return key; + } + + @Override + public Iterable<WindowedValue<T>> getElements() { + return committedElements; + } + + @Override + public PCollection<T> getPCollection() { + return pcollection; + } + + @Override + public Instant getSynchronizedProcessingOutputWatermark() { + return synchronizedCompletionTime; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("pcollection", pcollection) + .add("key", key) + .add("elements", committedElements) + .toString(); } } }