Repository: incubator-beam
Updated Branches:
  refs/heads/master e3105c8e1 -> 6914f2a1d


Refactor CommittedBundle in InProcessBundleFactory

Move to a static nested class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4e7d2e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4e7d2e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4e7d2e4

Branch: refs/heads/master
Commit: e4e7d2e4fb9da7248df7cde3877e3e6b88467dae
Parents: e3105c8
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 25 09:08:09 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Wed Apr 27 10:13:15 2016 -0700

----------------------------------------------------------------------
 .../inprocess/InProcessBundleFactory.java       | 82 ++++++++++++--------
 1 file changed, 50 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4e7d2e4/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java
index e39d02e..788fde1 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java
@@ -100,38 +100,56 @@ class InProcessBundleFactory implements BundleFactory {
       checkState(!committed, "Can't commit already committed bundle %s", this);
       committed = true;
       final Iterable<WindowedValue<T>> committedElements = elements.build();
-      return new CommittedBundle<T>() {
-        @Override
-        @Nullable
-        public Object getKey() {
-          return key;
-        }
-
-        @Override
-        public Iterable<WindowedValue<T>> getElements() {
-          return committedElements;
-        }
-
-        @Override
-        public PCollection<T> getPCollection() {
-          return pcollection;
-        }
-
-        @Override
-        public Instant getSynchronizedProcessingOutputWatermark() {
-          return synchronizedCompletionTime;
-        }
-
-        @Override
-        public String toString() {
-          return MoreObjects.toStringHelper(this)
-              .omitNullValues()
-              .add("pcollection", pcollection)
-              .add("key", key)
-              .add("elements", committedElements)
-              .toString();
-        }
-      };
+      return new CommittedInProcessBundle<>(
+          pcollection, key, committedElements, synchronizedCompletionTime);
+    }
+  }
+  private static class CommittedInProcessBundle<T> implements 
CommittedBundle<T> {
+    public CommittedInProcessBundle(
+        PCollection<T> pcollection,
+        Object key,
+        Iterable<WindowedValue<T>> committedElements,
+        Instant synchronizedCompletionTime) {
+      this.pcollection = pcollection;
+      this.key = key;
+      this.committedElements = committedElements;
+      this.synchronizedCompletionTime = synchronizedCompletionTime;
+    }
+
+    private final PCollection<T> pcollection;
+    private final Object key;
+    private final Iterable<WindowedValue<T>> committedElements;
+    private final Instant synchronizedCompletionTime;
+
+    @Override
+    @Nullable
+    public Object getKey() {
+      return key;
+    }
+
+    @Override
+    public Iterable<WindowedValue<T>> getElements() {
+      return committedElements;
+    }
+
+    @Override
+    public PCollection<T> getPCollection() {
+      return pcollection;
+    }
+
+    @Override
+    public Instant getSynchronizedProcessingOutputWatermark() {
+      return synchronizedCompletionTime;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .omitNullValues()
+          .add("pcollection", pcollection)
+          .add("key", key)
+          .add("elements", committedElements)
+          .toString();
     }
   }
 }

Reply via email to