johnyangk closed pull request #141: [NEMO-252] Fix CreatViewTransform to emit 
windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 7dc7af65a..ee10b21f4 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -272,7 +272,7 @@ private static void windowTranslator(final 
TranslationContext ctx,
   private static void createPCollectionViewTranslator(final TranslationContext 
ctx,
                                                       final 
PrimitiveTransformVertex transformVertex,
                                                       final 
View.CreatePCollectionView<?, ?> transform) {
-    final IRVertex vertex = new OperatorVertex(new 
CreateViewTransform<>(transform.getView()));
+    final IRVertex vertex = new OperatorVertex(new 
CreateViewTransform(transform.getView().getViewFn()));
     ctx.addVertex(vertex);
     transformVertex.getNode().getInputs().values().forEach(input -> 
ctx.addEdgeTo(vertex, input));
     ctx.registerMainOutputFrom(vertex, transform.getView());
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index d60bcfc4c..05e5af610 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -18,37 +18,43 @@
  */
 package org.apache.nemo.compiler.frontend.beam.transform;
 
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
 
 import javax.annotation.Nullable;
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.*;
 
 /**
- * CreateView transform implementation.
- * @param <I> input type.
- * @param <O> output type.
+ * This transforms emits materialized data for each window.
+ * @param <I> input type
+ * @param <O> materialized output type
  */
-public final class CreateViewTransform<I, O> extends 
NoWatermarkEmitTransform<WindowedValue<I>, WindowedValue<O>> {
-  private final PCollectionView pCollectionView;
+public final class CreateViewTransform<I, O> implements
+  Transform<WindowedValue<KV<?, I>>, WindowedValue<O>> {
   private OutputCollector<WindowedValue<O>> outputCollector;
   private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
-  private final MultiView<Object> multiView;
+  private final Map<BoundedWindow, List<I>> windowListMap;
+
+  // TODO #259: we can remove this variable by implementing 
ReadyCheckingSideInputReader
+  private boolean isEmitted = false;
+  private long currentOutputWatermark;
 
   /**
    * Constructor of CreateViewTransform.
-   * @param pCollectionView the pCollectionView to create.
+   * @param viewFn the viewFn that materializes data.
    */
-  public CreateViewTransform(final PCollectionView<O> pCollectionView) {
-    this.pCollectionView = pCollectionView;
-    this.viewFn = this.pCollectionView.getViewFn();
-    this.multiView = new MultiView<>();
+  public CreateViewTransform(final ViewFn<Materializations.MultimapView<Void, 
?>, O> viewFn)  {
+    this.viewFn = viewFn;
+    this.windowListMap = new HashMap<>();
+    this.currentOutputWatermark = Long.MIN_VALUE;
   }
 
   @Override
@@ -57,23 +63,69 @@ public void prepare(final Context context, final 
OutputCollector<WindowedValue<O
   }
 
   @Override
-  public void onData(final WindowedValue<I> element) {
-    // TODO #216: support window in view
-    final KV kv = ((WindowedValue<KV>) element).getValue();
-    multiView.getDataList().add(kv.getValue());
+  public void onData(final WindowedValue<KV<?, I>> element) {
+    // The key of element is always null (beam's semantic)
+    // because view is a globally materialized data regardless of key
+    for (final BoundedWindow window : element.getWindows()) {
+      windowListMap.putIfAbsent(window, new ArrayList<>());
+      final List<I> list = windowListMap.get(window);
+      list.add(element.getValue().getValue());
+    }
+  }
+
+  @Override
+  public void onWatermark(final Watermark inputWatermark) {
+
+    // If no data, just forwards the watermark
+    if (windowListMap.size() == 0 && currentOutputWatermark < 
inputWatermark.getTimestamp()) {
+      currentOutputWatermark = inputWatermark.getTimestamp();
+      outputCollector.emitWatermark(inputWatermark);
+      return;
+    }
+
+    final Iterator<Map.Entry<BoundedWindow, List<I>>> iterator = 
windowListMap.entrySet().iterator();
+    long minOutputTimestampOfEmittedWindows = Long.MAX_VALUE;
+
+    while (iterator.hasNext()) {
+      final Map.Entry<BoundedWindow, List<I>> entry = iterator.next();
+      if (entry.getKey().maxTimestamp().getMillis() <= 
inputWatermark.getTimestamp()) {
+        // emit the windowed data if the watermark timestamp > the window max 
boundary
+        final O view = viewFn.apply(new MultiView<>(entry.getValue()));
+        outputCollector.emit(WindowedValue.of(
+          view, entry.getKey().maxTimestamp(), entry.getKey(), 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
+        iterator.remove();
+        isEmitted = true;
+
+        minOutputTimestampOfEmittedWindows =
+          Math.min(minOutputTimestampOfEmittedWindows, 
entry.getKey().maxTimestamp().getMillis());
+      }
+    }
+
+    if (minOutputTimestampOfEmittedWindows != Long.MAX_VALUE
+      && currentOutputWatermark < minOutputTimestampOfEmittedWindows) {
+      // update current output watermark and emit to next operators
+      currentOutputWatermark = minOutputTimestampOfEmittedWindows;
+      outputCollector.emitWatermark(new Watermark(currentOutputWatermark));
+    }
   }
 
   @Override
   public void close() {
-    final Object view = viewFn.apply(multiView);
-    // TODO #216: support window in view
-    outputCollector.emit(WindowedValue.valueInGlobalWindow((O) view));
+    onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+
+    if (!isEmitted) {
+      // TODO #259: This is an ad-hoc code to resolve the view that has no data
+      // Currently, broadCastWorker reads the view data, but it throws 
exception if no data is available for a view.
+      // We should use watermark value to track whether the materialized data 
in a view is available or not.
+      final O view = viewFn.apply(new MultiView<>(Collections.emptyList()));
+      outputCollector.emit(WindowedValue.valueInGlobalWindow(view));
+    }
   }
 
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
-    sb.append("CreateViewTransform:" + pCollectionView);
+    sb.append("CreateViewTransform:" + viewFn);
     return sb.toString();
   }
 
@@ -82,23 +134,19 @@ public String toString() {
    * @param <T> primitive view type
    */
   public final class MultiView<T> implements 
Materializations.MultimapView<Void, T>, Serializable {
-    private final ArrayList<T> dataList;
+    private final Iterable<T> iterable;
 
     /**
      * Constructor.
      */
-    MultiView() {
+    MultiView(final Iterable<T> iterable) {
       // Create a placeholder for side input data. CreateViewTransform#onData 
stores data to this list.
-      dataList = new ArrayList<>();
+      this.iterable = iterable;
     }
 
     @Override
     public Iterable<T> get(@Nullable final Void aVoid) {
-      return dataList;
-    }
-
-    public ArrayList<T> getDataList() {
-      return dataList;
+      return iterable;
     }
   }
 }
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 7c44b7610..7d20f2653 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -48,6 +48,7 @@
   private final Map<K, List<WindowedValue<InputT>>> keyToValues;
   private transient InMemoryTimerInternalsFactory 
inMemoryTimerInternalsFactory;
   private transient InMemoryStateInternalsFactory 
inMemoryStateInternalsFactory;
+  private long currentOutputWatermark;
 
   /**
    * GroupByKey constructor.
@@ -69,6 +70,7 @@ public GroupByKeyAndWindowDoFnTransform(final 
Map<TupleTag<?>, Coder<?>> outputC
       options);
     this.keyToValues = new HashMap<>();
     this.reduceFn = reduceFn;
+    this.currentOutputWatermark = Long.MIN_VALUE;
   }
 
   /**
@@ -113,34 +115,50 @@ public void onData(final WindowedValue<KV<K, InputT>> 
element) {
 
   /**
    * Process the collected data and trigger timers.
-   * @param watermark current watermark
+   * @param inputWatermark current input watermark
    * @param processingTime processing time
    * @param synchronizedTime synchronized time
    */
-  private void processElementsAndTriggerTimers(final Watermark watermark,
+  private void processElementsAndTriggerTimers(final Watermark inputWatermark,
                                                final Instant processingTime,
                                                final Instant synchronizedTime) 
{
-    keyToValues.forEach((key, val) -> {
+    long minOutputTimestampsOfEmittedWindows = Long.MAX_VALUE;
+
+    for (final Map.Entry<K, List<WindowedValue<InputT>>> entry : 
keyToValues.entrySet()) {
+      final K key = entry.getKey();
+      final List<WindowedValue<InputT>> values = entry.getValue();
+
       // for each key
       // Process elements
-      if (!val.isEmpty()) {
+      if (!values.isEmpty()) {
         final KeyedWorkItem<K, InputT> keyedWorkItem =
-          KeyedWorkItems.elementsWorkItem(key, val);
+          KeyedWorkItems.elementsWorkItem(key, values);
         // The DoFnRunner interface requires WindowedValue,
         // but this windowed value is actually not used in the ReduceFnRunner 
internal.
         
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
       }
 
       // Trigger timers
-      triggerTimers(key, watermark, processingTime, synchronizedTime);
+      final long minOutputTimestamp =
+        triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
+
+      minOutputTimestampsOfEmittedWindows = 
Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp);
+
       // Remove values
-      val.clear();
-    });
+      values.clear();
+    }
+
+    // Emit watermark to downstream operators
+    if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE
+      && currentOutputWatermark < minOutputTimestampsOfEmittedWindows) {
+      currentOutputWatermark = minOutputTimestampsOfEmittedWindows;
+      getOutputCollector().emitWatermark(new 
Watermark(minOutputTimestampsOfEmittedWindows));
+    }
   }
 
   @Override
-  public void onWatermark(final Watermark watermark) {
-    processElementsAndTriggerTimers(watermark, Instant.now(), Instant.now());
+  public void onWatermark(final Watermark inputWatermark) {
+    processElementsAndTriggerTimers(inputWatermark, Instant.now(), 
Instant.now());
   }
 
   /**
@@ -161,8 +179,10 @@ protected void beforeClose() {
    * @param watermark watermark
    * @param processingTime processing time
    * @param synchronizedTime synchronized time
+   * @return the minimum output timestamp.
+   * If no data is emitted, it returns Long.MAX_VALUE.
    */
-  private void triggerTimers(final K key,
+  private long triggerTimers(final K key,
                              final Watermark watermark,
                              final Instant processingTime,
                              final Instant synchronizedTime) {
@@ -179,28 +199,27 @@ private void triggerTimers(final K key,
     final List<TimerInternals.TimerData> timerDataList = 
getEligibleTimers(timerInternals);
 
     if (timerDataList.isEmpty()) {
-      return;
-    }
+      return Long.MAX_VALUE;
+    } else {
 
-    // Trigger timers and emit windowed data
-    final KeyedWorkItem<K, InputT> timerWorkItem =
-      KeyedWorkItems.timersWorkItem(key, timerDataList);
-    // The DoFnRunner interface requires WindowedValue,
-    // but this windowed value is actually not used in the ReduceFnRunner 
internal.
-    
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
-
-    // output watermark
-    // we set output watermark to the minimum of the timer data
-    long outputWatermark = Long.MAX_VALUE;
-    for (final TimerInternals.TimerData timer : timerDataList) {
-      if (outputWatermark > timer.getTimestamp().getMillis()) {
-        outputWatermark = timer.getTimestamp().getMillis();
+      // Trigger timers and emit windowed data
+      final KeyedWorkItem<K, InputT> timerWorkItem =
+        KeyedWorkItems.timersWorkItem(key, timerDataList);
+      // The DoFnRunner interface requires WindowedValue,
+      // but this windowed value is actually not used in the ReduceFnRunner 
internal.
+      
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
+
+      // output watermark
+      // we set output watermark to the minimum of the timer data
+      long keyOutputTimestamp = Long.MAX_VALUE;
+      for (final TimerInternals.TimerData timer : timerDataList) {
+        keyOutputTimestamp = Math.min(keyOutputTimestamp, 
timer.getTimestamp().getMillis());
       }
-    }
 
-    // Emit watermark to downstream operators
-    timerInternals.advanceOutputWatermark(new Instant(outputWatermark));
-    getOutputCollector().emitWatermark(new Watermark(outputWatermark));
+      timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp));
+
+      return keyOutputTimestamp;
+    }
   }
 
   @Override
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
new file mode 100644
index 000000000..762e327fd
--- /dev/null
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.transforms.Materialization;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public final class CreateViewTransformTest {
+
+  // [---- window1 --------]         [--------------- window2 ---------------]
+  // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- 
watermark7
+  // (null, "hello")
+  //      (null, "world")
+  //             (null, "hello")
+  //                   ==> window1: {3} (calculate # of elements)
+  //                                 (null, "a")
+  //                                                       (null,"a")
+  //                                                             (null,"a")
+  //                                                                  
(null,"b")
+  //                                                       => window2: {4} 
(calculate # of elements)
+  @Test
+  @SuppressWarnings("unchecked")
+  public void test() {
+
+    final FixedWindows fixedwindows = 
FixedWindows.of(Duration.standardSeconds(1));
+    final CreateViewTransform<String, Integer> viewTransform =
+      new CreateViewTransform(new SumViewFn());
+
+    final Instant ts1 = new Instant(1);
+    final Instant ts2 = new Instant(100);
+    final Instant ts3 = new Instant(300);
+    final Watermark watermark = new Watermark(1003);
+    final Instant ts4 = new Instant(1200);
+    final Watermark watermark2 = new Watermark(1400);
+    final Instant ts5 = new Instant(1600);
+    final Instant ts6 = new Instant(1800);
+    final Instant ts7 = new Instant(1900);
+    final Watermark watermark3 = new Watermark(2100);
+
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final TestOutputCollector<Integer> oc = new TestOutputCollector();
+    viewTransform.prepare(context, oc);
+
+    viewTransform.onData(WindowedValue.of(
+      KV.of(null, "hello"), ts1, fixedwindows.assignWindow(ts1), 
PaneInfo.NO_FIRING));
+
+    viewTransform.onData(WindowedValue.of(
+      KV.of(null, "world"), ts2, fixedwindows.assignWindow(ts2), 
PaneInfo.NO_FIRING));
+
+    viewTransform.onData(WindowedValue.of(
+      KV.of(null, "hello"), ts3, fixedwindows.assignWindow(ts3), 
PaneInfo.NO_FIRING));
+
+    viewTransform.onWatermark(watermark);
+
+    // materialized data
+    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)), 
oc.outputs.get(0).getWindows());
+    assertEquals(new Integer(3), oc.outputs.get(0).getValue());
+
+    // check output watermark
+    assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(),
+      oc.watermarks.get(0).getTimestamp());
+
+    oc.outputs.clear();
+    oc.watermarks.clear();
+
+
+    viewTransform.onData(WindowedValue.of(
+      KV.of(null, "a"), ts4, fixedwindows.assignWindow(ts4), 
PaneInfo.NO_FIRING));
+
+    // do not emit anything
+    viewTransform.onWatermark(watermark2);
+    assertEquals(0, oc.outputs.size());
+    assertEquals(0, oc.watermarks.size());
+
+    viewTransform.onData(WindowedValue.of(
+      KV.of(null, "a"), ts5, fixedwindows.assignWindow(ts5), 
PaneInfo.NO_FIRING));
+
+    viewTransform.onData(WindowedValue.of(
+      KV.of(null, "a"), ts6, fixedwindows.assignWindow(ts6), 
PaneInfo.NO_FIRING));
+
+    viewTransform.onData(WindowedValue.of(
+      KV.of(null, "b"), ts7, fixedwindows.assignWindow(ts7), 
PaneInfo.NO_FIRING));
+
+    // emit windowed value
+    viewTransform.onWatermark(watermark3);
+
+    // materialized data
+    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), 
oc.outputs.get(0).getWindows());
+    assertEquals(new Integer(4), oc.outputs.get(0).getValue());
+
+    // check output watermark
+    assertEquals(fixedwindows.assignWindow(ts4).maxTimestamp().getMillis(),
+      oc.watermarks.get(0).getTimestamp());
+
+    oc.outputs.clear();
+
+    viewTransform.close();
+    assertEquals(0, oc.outputs.size());
+  }
+
+  final class SumViewFn extends ViewFn<Materializations.MultimapView<Void, 
String>, Integer> {
+
+    @Override
+    public Materialization<Materializations.MultimapView<Void, String>> 
getMaterialization() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Integer apply(final Materializations.MultimapView<Void, String> 
view) {
+      int sum = 0;
+      // MultimapView.get is Nullable
+      for (String s : view.get(null)) {
+        sum += 1;
+      }
+      return sum;
+    }
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to