Add InProcess Override for CreatePCollectionView

Because windowing is used to retrieve values from a
PCollectionView, the elements must go through a GroupByKey.

Provide a PTransform override for use in the InProcess runner to apply
global grouping by window and pane, and a WriteView primitive to store
the contents of the view in a PCollectionView.

Update the View PTransform to make the view it returns available outside
of the application.

----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115409320


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87b28e7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87b28e7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87b28e7d

Branch: refs/heads/master
Commit: 87b28e7dd1b2f8ca31eb155cc5bad4f98717664b
Parents: 6613031
Author: tgroh <[email protected]>
Authored: Tue Feb 23 19:12:37 2016 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Thu Feb 25 23:58:27 2016 -0800

----------------------------------------------------------------------
 .../inprocess/InProcessPipelineRunner.java      |  5 +-
 .../runners/inprocess/ViewEvaluatorFactory.java | 69 +++++++++++++++++---
 .../cloud/dataflow/sdk/transforms/View.java     |  5 ++
 .../inprocess/ViewEvaluatorFactoryTest.java     | 33 ++++++++--
 4 files changed, 96 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87b28e7d/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index 72642da..26c5061 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -20,10 +20,12 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.annotations.Experimental;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKey;
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessCreatePCollectionView;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
 import com.google.cloud.dataflow.sdk.util.BaseExecutionContext;
@@ -60,6 +62,7 @@ public class InProcessPipelineRunner {
       defaultTransformOverrides =
           ImmutableMap.<Class<? extends PTransform>, Class<? extends 
PTransform>>builder()
               .put(GroupByKey.class, InProcessGroupByKey.class)
+              .put(CreatePCollectionView.class, 
InProcessCreatePCollectionView.class)
               .build();
 
   private static Map<Class<?>, TransformEvaluatorFactory> 
defaultEvaluatorFactories =
@@ -222,7 +225,7 @@ public class InProcessPipelineRunner {
      * Create a bundle whose elements will be used in a PCollectionView.
      */
     <ElemT, ViewT> PCollectionViewWriter<ElemT, ViewT> 
createPCollectionViewWriter(
-        PCollection<ElemT> input, PCollectionView<ViewT> output);
+        PCollection<Iterable<ElemT>> input, PCollectionView<ViewT> output);
 
     /**
      * Get the options used by this {@link Pipeline}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87b28e7d/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
index 654652c..f47cd1d 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
@@ -15,11 +15,16 @@
  */
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.Values;
 import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
+import com.google.cloud.dataflow.sdk.transforms.WithKeys;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
@@ -30,6 +35,12 @@ import java.util.List;
 /**
  * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for 
the
  * {@link CreatePCollectionView} primitive {@link PTransform}.
+ *
+ * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator 
TransformEvaluators} for
+ * the {@link WriteView} {@link PTransform}, which is part of the
+ * {@link InProcessCreatePCollectionView} composite transform. This transform 
is an override for the
+ * {@link CreatePCollectionView} transform that applies windowing and triggers 
before the view is
+ * written.
  */
 class ViewEvaluatorFactory implements TransformEvaluatorFactory {
   @SuppressWarnings({"rawtypes", "unchecked"})
@@ -42,19 +53,21 @@ class ViewEvaluatorFactory implements 
TransformEvaluatorFactory {
         (AppliedPTransform) application, evaluationContext);
   }
 
-  private <InT, OuT> TransformEvaluator<InT> createEvaluator(
-      final AppliedPTransform<PCollection<InT>, PCollectionView<OuT>,
-      CreatePCollectionView<InT, OuT>> application,
+  private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(
+      final AppliedPTransform<PCollection<Iterable<InT>>, 
PCollectionView<OuT>, WriteView<InT, OuT>>
+          application,
       InProcessEvaluationContext context) {
-    PCollection<InT> input = application.getInput();
+    PCollection<Iterable<InT>> input = application.getInput();
     final PCollectionViewWriter<InT, OuT> writer =
         context.createPCollectionViewWriter(input, application.getOutput());
-    return new TransformEvaluator<InT>() {
+    return new TransformEvaluator<Iterable<InT>>() {
       private final List<WindowedValue<InT>> elements = new ArrayList<>();
 
       @Override
-      public void processElement(WindowedValue<InT> element) {
-        elements.add(element);
+      public void processElement(WindowedValue<Iterable<InT>> element) {
+        for (InT input : element.getValue()) {
+          elements.add(element.withValue(input));
+        }
       }
 
       @Override
@@ -64,5 +77,45 @@ class ViewEvaluatorFactory implements 
TransformEvaluatorFactory {
       }
     };
   }
-}
 
+  /**
+   * An in-process override for {@link CreatePCollectionView}.
+   */
+  public static class InProcessCreatePCollectionView<ElemT, ViewT>
+      extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+    private final CreatePCollectionView<ElemT, ViewT> og;
+
+    private InProcessCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> 
og) {
+      this.og = og;
+    }
+
+    @Override
+    public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
+      return input.apply(WithKeys.<Void, ElemT>of((Void) null))
+          .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
+          .apply(GroupByKey.<Void, ElemT>create())
+          .apply(Values.<Iterable<ElemT>>create())
+          .apply(new WriteView<ElemT, ViewT>(og));
+    }
+  }
+
+  /**
+   * An in-process implementation of the {@link CreatePCollectionView} 
primitive.
+   *
+   * This implementation requires the input {@link PCollection} to be an 
iterable, which is provided
+   * to {@link PCollectionView#fromIterableInternal(Iterable)}.
+   */
+  public static final class WriteView<ElemT, ViewT>
+      extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> 
{
+    private final CreatePCollectionView<ElemT, ViewT> og;
+
+    WriteView(CreatePCollectionView<ElemT, ViewT> og) {
+      this.og = og;
+    }
+
+    @Override
+    public PCollectionView<ViewT> apply(PCollection<Iterable<ElemT>> input) {
+      return og.getView();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87b28e7d/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/View.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/View.java
index a41da34..e2c4487 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/View.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/View.java
@@ -17,6 +17,7 @@
 package com.google.cloud.dataflow.sdk.transforms;
 
 import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
 import com.google.cloud.dataflow.sdk.util.PCollectionViews;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.KV;
@@ -435,6 +436,10 @@ public class View {
       return new CreatePCollectionView<>(view);
     }
 
+    public PCollectionView<ViewT> getView() {
+      return view;
+    }
+
     @Override
     public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
       return view;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87b28e7d/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
index c29308f..021709b 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
@@ -21,16 +21,24 @@ import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
 import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.Values;
+import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
+import com.google.cloud.dataflow.sdk.transforms.WithKeys;
+import com.google.cloud.dataflow.sdk.util.PCollectionViews;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.common.collect.ImmutableList;
 
 import org.joda.time.Instant;
 import org.junit.Test;
@@ -45,19 +53,30 @@ public class ViewEvaluatorFactoryTest {
   @Test
   public void testInMemoryEvaluator() throws Exception {
     TestPipeline p = TestPipeline.create();
+
     PCollection<String> input = p.apply(Create.of("foo", "bar"));
-    PCollectionView<Iterable<String>> view = 
input.apply(View.<String>asIterable());
+    CreatePCollectionView<String, Iterable<String>> createView =
+        CreatePCollectionView.of(
+            PCollectionViews.iterableView(p, input.getWindowingStrategy(), 
StringUtf8Coder.of()));
+    PCollection<Iterable<String>> concat =
+        input.apply(WithKeys.<Void, String>of((Void) null))
+            .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))
+            .apply(GroupByKey.<Void, String>create())
+            .apply(Values.<Iterable<String>>create());
+    PCollectionView<Iterable<String>> view =
+        concat.apply(new ViewEvaluatorFactory.WriteView<>(createView));
 
     InProcessEvaluationContext context = 
mock(InProcessEvaluationContext.class);
     TestViewWriter<String, Iterable<String>> viewWriter = new 
TestViewWriter<>();
-    when(context.createPCollectionViewWriter(input, 
view)).thenReturn(viewWriter);
+    when(context.createPCollectionViewWriter(concat, 
view)).thenReturn(viewWriter);
 
     CommittedBundle<String> inputBundle = 
InProcessBundle.unkeyed(input).commit(Instant.now());
-    TransformEvaluator<String> evaluator = new 
ViewEvaluatorFactory().forApplication(
-        view.getProducingTransformInternal(), inputBundle, context);
+    TransformEvaluator<Iterable<String>> evaluator =
+        new ViewEvaluatorFactory()
+            .forApplication(view.getProducingTransformInternal(), inputBundle, 
context);
 
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("bar"));
+    evaluator.processElement(
+        
WindowedValue.<Iterable<String>>valueInGlobalWindow(ImmutableList.of("foo", 
"bar")));
     assertThat(viewWriter.latest, nullValue());
 
     evaluator.finishBundle();

Reply via email to