taegeonum commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230248903
########## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java ########## @@ -18,37 +18,43 @@ */ package org.apache.nemo.compiler.frontend.beam.transform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.nemo.common.ir.OutputCollector; -import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform; import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; +import org.apache.nemo.common.ir.vertex.transform.Transform; +import org.apache.nemo.common.punctuation.Watermark; import javax.annotation.Nullable; import java.io.Serializable; -import java.util.ArrayList; +import java.util.*; /** - * CreateView transform implementation. - * @param <I> input type. - * @param <O> output type. + * This transforms emits materialized data for each window. + * @param <I> input type + * @param <O> materialized output type */ -public final class CreateViewTransform<I, O> extends NoWatermarkEmitTransform<WindowedValue<I>, WindowedValue<O>> { - private final PCollectionView pCollectionView; +public final class CreateViewTransform<I, O> implements + Transform<WindowedValue<KV<?, I>>, WindowedValue<O>> { private OutputCollector<WindowedValue<O>> outputCollector; private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn; - private final MultiView<Object> multiView; + private final Map<BoundedWindow, List<I>> windowListMap; + + // TODO #XXX: we should remove this variable by refactoring broadcast worker for side input + private boolean isEmitted = false; + private long outputWatermark; Review comment: I will change it to currentOutputWatermark ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services