johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230244505
 
 

 ##########
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
 ##########
 @@ -18,37 +18,43 @@
  */
 package org.apache.nemo.compiler.frontend.beam.transform;
 
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
 
 import javax.annotation.Nullable;
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.*;
 
 /**
- * CreateView transform implementation.
- * @param <I> input type.
- * @param <O> output type.
+ * This transforms emits materialized data for each window.
+ * @param <I> input type
+ * @param <O> materialized output type
  */
-public final class CreateViewTransform<I, O> extends 
NoWatermarkEmitTransform<WindowedValue<I>, WindowedValue<O>> {
-  private final PCollectionView pCollectionView;
+public final class CreateViewTransform<I, O> implements
+  Transform<WindowedValue<KV<?, I>>, WindowedValue<O>> {
   private OutputCollector<WindowedValue<O>> outputCollector;
   private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
-  private final MultiView<Object> multiView;
+  private final Map<BoundedWindow, List<I>> windowListMap;
+
+  // TODO #XXX: we should remove this variable by refactoring broadcast worker 
for side input
+  private boolean isEmitted = false;
+  private long outputWatermark;
 
 Review comment:
   lastEmittedWatermark?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to