johnyangk closed pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java index 7dc7af65a..ee10b21f4 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java @@ -272,7 +272,7 @@ private static void windowTranslator(final TranslationContext ctx, private static void createPCollectionViewTranslator(final TranslationContext ctx, final PrimitiveTransformVertex transformVertex, final View.CreatePCollectionView<?, ?> transform) { - final IRVertex vertex = new OperatorVertex(new CreateViewTransform<>(transform.getView())); + final IRVertex vertex = new OperatorVertex(new CreateViewTransform(transform.getView().getViewFn())); ctx.addVertex(vertex); transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input)); ctx.registerMainOutputFrom(vertex, transform.getView()); diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java index d60bcfc4c..05e5af610 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java @@ -18,37 +18,43 @@ */ package org.apache.nemo.compiler.frontend.beam.transform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.nemo.common.ir.OutputCollector; -import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform; import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; +import org.apache.nemo.common.ir.vertex.transform.Transform; +import org.apache.nemo.common.punctuation.Watermark; import javax.annotation.Nullable; import java.io.Serializable; -import java.util.ArrayList; +import java.util.*; /** - * CreateView transform implementation. - * @param <I> input type. - * @param <O> output type. + * This transforms emits materialized data for each window. + * @param <I> input type + * @param <O> materialized output type */ -public final class CreateViewTransform<I, O> extends NoWatermarkEmitTransform<WindowedValue<I>, WindowedValue<O>> { - private final PCollectionView pCollectionView; +public final class CreateViewTransform<I, O> implements + Transform<WindowedValue<KV<?, I>>, WindowedValue<O>> { private OutputCollector<WindowedValue<O>> outputCollector; private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn; - private final MultiView<Object> multiView; + private final Map<BoundedWindow, List<I>> windowListMap; + + // TODO #259: we can remove this variable by implementing ReadyCheckingSideInputReader + private boolean isEmitted = false; + private long currentOutputWatermark; /** * Constructor of CreateViewTransform. - * @param pCollectionView the pCollectionView to create. + * @param viewFn the viewFn that materializes data. */ - public CreateViewTransform(final PCollectionView<O> pCollectionView) { - this.pCollectionView = pCollectionView; - this.viewFn = this.pCollectionView.getViewFn(); - this.multiView = new MultiView<>(); + public CreateViewTransform(final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn) { + this.viewFn = viewFn; + this.windowListMap = new HashMap<>(); + this.currentOutputWatermark = Long.MIN_VALUE; } @Override @@ -57,23 +63,69 @@ public void prepare(final Context context, final OutputCollector<WindowedValue<O } @Override - public void onData(final WindowedValue<I> element) { - // TODO #216: support window in view - final KV kv = ((WindowedValue<KV>) element).getValue(); - multiView.getDataList().add(kv.getValue()); + public void onData(final WindowedValue<KV<?, I>> element) { + // The key of element is always null (beam's semantic) + // because view is a globally materialized data regardless of key + for (final BoundedWindow window : element.getWindows()) { + windowListMap.putIfAbsent(window, new ArrayList<>()); + final List<I> list = windowListMap.get(window); + list.add(element.getValue().getValue()); + } + } + + @Override + public void onWatermark(final Watermark inputWatermark) { + + // If no data, just forwards the watermark + if (windowListMap.size() == 0 && currentOutputWatermark < inputWatermark.getTimestamp()) { + currentOutputWatermark = inputWatermark.getTimestamp(); + outputCollector.emitWatermark(inputWatermark); + return; + } + + final Iterator<Map.Entry<BoundedWindow, List<I>>> iterator = windowListMap.entrySet().iterator(); + long minOutputTimestampOfEmittedWindows = Long.MAX_VALUE; + + while (iterator.hasNext()) { + final Map.Entry<BoundedWindow, List<I>> entry = iterator.next(); + if (entry.getKey().maxTimestamp().getMillis() <= inputWatermark.getTimestamp()) { + // emit the windowed data if the watermark timestamp > the window max boundary + final O view = viewFn.apply(new MultiView<>(entry.getValue())); + outputCollector.emit(WindowedValue.of( + view, entry.getKey().maxTimestamp(), entry.getKey(), PaneInfo.ON_TIME_AND_ONLY_FIRING)); + iterator.remove(); + isEmitted = true; + + minOutputTimestampOfEmittedWindows = + Math.min(minOutputTimestampOfEmittedWindows, entry.getKey().maxTimestamp().getMillis()); + } + } + + if (minOutputTimestampOfEmittedWindows != Long.MAX_VALUE + && currentOutputWatermark < minOutputTimestampOfEmittedWindows) { + // update current output watermark and emit to next operators + currentOutputWatermark = minOutputTimestampOfEmittedWindows; + outputCollector.emitWatermark(new Watermark(currentOutputWatermark)); + } } @Override public void close() { - final Object view = viewFn.apply(multiView); - // TODO #216: support window in view - outputCollector.emit(WindowedValue.valueInGlobalWindow((O) view)); + onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); + + if (!isEmitted) { + // TODO #259: This is an ad-hoc code to resolve the view that has no data + // Currently, broadCastWorker reads the view data, but it throws exception if no data is available for a view. + // We should use watermark value to track whether the materialized data in a view is available or not. + final O view = viewFn.apply(new MultiView<>(Collections.emptyList())); + outputCollector.emit(WindowedValue.valueInGlobalWindow(view)); + } } @Override public String toString() { final StringBuilder sb = new StringBuilder(); - sb.append("CreateViewTransform:" + pCollectionView); + sb.append("CreateViewTransform:" + viewFn); return sb.toString(); } @@ -82,23 +134,19 @@ public String toString() { * @param <T> primitive view type */ public final class MultiView<T> implements Materializations.MultimapView<Void, T>, Serializable { - private final ArrayList<T> dataList; + private final Iterable<T> iterable; /** * Constructor. */ - MultiView() { + MultiView(final Iterable<T> iterable) { // Create a placeholder for side input data. CreateViewTransform#onData stores data to this list. - dataList = new ArrayList<>(); + this.iterable = iterable; } @Override public Iterable<T> get(@Nullable final Void aVoid) { - return dataList; - } - - public ArrayList<T> getDataList() { - return dataList; + return iterable; } } } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java index 7c44b7610..7d20f2653 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java @@ -48,6 +48,7 @@ private final Map<K, List<WindowedValue<InputT>>> keyToValues; private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory; private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory; + private long currentOutputWatermark; /** * GroupByKey constructor. @@ -69,6 +70,7 @@ public GroupByKeyAndWindowDoFnTransform(final Map<TupleTag<?>, Coder<?>> outputC options); this.keyToValues = new HashMap<>(); this.reduceFn = reduceFn; + this.currentOutputWatermark = Long.MIN_VALUE; } /** @@ -113,34 +115,50 @@ public void onData(final WindowedValue<KV<K, InputT>> element) { /** * Process the collected data and trigger timers. - * @param watermark current watermark + * @param inputWatermark current input watermark * @param processingTime processing time * @param synchronizedTime synchronized time */ - private void processElementsAndTriggerTimers(final Watermark watermark, + private void processElementsAndTriggerTimers(final Watermark inputWatermark, final Instant processingTime, final Instant synchronizedTime) { - keyToValues.forEach((key, val) -> { + long minOutputTimestampsOfEmittedWindows = Long.MAX_VALUE; + + for (final Map.Entry<K, List<WindowedValue<InputT>>> entry : keyToValues.entrySet()) { + final K key = entry.getKey(); + final List<WindowedValue<InputT>> values = entry.getValue(); + // for each key // Process elements - if (!val.isEmpty()) { + if (!values.isEmpty()) { final KeyedWorkItem<K, InputT> keyedWorkItem = - KeyedWorkItems.elementsWorkItem(key, val); + KeyedWorkItems.elementsWorkItem(key, values); // The DoFnRunner interface requires WindowedValue, // but this windowed value is actually not used in the ReduceFnRunner internal. getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem)); } // Trigger timers - triggerTimers(key, watermark, processingTime, synchronizedTime); + final long minOutputTimestamp = + triggerTimers(key, inputWatermark, processingTime, synchronizedTime); + + minOutputTimestampsOfEmittedWindows = Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp); + // Remove values - val.clear(); - }); + values.clear(); + } + + // Emit watermark to downstream operators + if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE + && currentOutputWatermark < minOutputTimestampsOfEmittedWindows) { + currentOutputWatermark = minOutputTimestampsOfEmittedWindows; + getOutputCollector().emitWatermark(new Watermark(minOutputTimestampsOfEmittedWindows)); + } } @Override - public void onWatermark(final Watermark watermark) { - processElementsAndTriggerTimers(watermark, Instant.now(), Instant.now()); + public void onWatermark(final Watermark inputWatermark) { + processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now()); } /** @@ -161,8 +179,10 @@ protected void beforeClose() { * @param watermark watermark * @param processingTime processing time * @param synchronizedTime synchronized time + * @return the minimum output timestamp. + * If no data is emitted, it returns Long.MAX_VALUE. */ - private void triggerTimers(final K key, + private long triggerTimers(final K key, final Watermark watermark, final Instant processingTime, final Instant synchronizedTime) { @@ -179,28 +199,27 @@ private void triggerTimers(final K key, final List<TimerInternals.TimerData> timerDataList = getEligibleTimers(timerInternals); if (timerDataList.isEmpty()) { - return; - } + return Long.MAX_VALUE; + } else { - // Trigger timers and emit windowed data - final KeyedWorkItem<K, InputT> timerWorkItem = - KeyedWorkItems.timersWorkItem(key, timerDataList); - // The DoFnRunner interface requires WindowedValue, - // but this windowed value is actually not used in the ReduceFnRunner internal. - getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem)); - - // output watermark - // we set output watermark to the minimum of the timer data - long outputWatermark = Long.MAX_VALUE; - for (final TimerInternals.TimerData timer : timerDataList) { - if (outputWatermark > timer.getTimestamp().getMillis()) { - outputWatermark = timer.getTimestamp().getMillis(); + // Trigger timers and emit windowed data + final KeyedWorkItem<K, InputT> timerWorkItem = + KeyedWorkItems.timersWorkItem(key, timerDataList); + // The DoFnRunner interface requires WindowedValue, + // but this windowed value is actually not used in the ReduceFnRunner internal. + getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem)); + + // output watermark + // we set output watermark to the minimum of the timer data + long keyOutputTimestamp = Long.MAX_VALUE; + for (final TimerInternals.TimerData timer : timerDataList) { + keyOutputTimestamp = Math.min(keyOutputTimestamp, timer.getTimestamp().getMillis()); } - } - // Emit watermark to downstream operators - timerInternals.advanceOutputWatermark(new Instant(outputWatermark)); - getOutputCollector().emitWatermark(new Watermark(outputWatermark)); + timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp)); + + return keyOutputTimestamp; + } } @Override diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java new file mode 100644 index 000000000..762e327fd --- /dev/null +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.sdk.transforms.Materialization; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.ViewFn; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.nemo.common.ir.vertex.transform.Transform; +import org.apache.nemo.common.punctuation.Watermark; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +import java.util.*; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public final class CreateViewTransformTest { + + // [---- window1 --------] [--------------- window2 ---------------] + // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7 + // (null, "hello") + // (null, "world") + // (null, "hello") + // ==> window1: {3} (calculate # of elements) + // (null, "a") + // (null,"a") + // (null,"a") + // (null,"b") + // => window2: {4} (calculate # of elements) + @Test + @SuppressWarnings("unchecked") + public void test() { + + final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1)); + final CreateViewTransform<String, Integer> viewTransform = + new CreateViewTransform(new SumViewFn()); + + final Instant ts1 = new Instant(1); + final Instant ts2 = new Instant(100); + final Instant ts3 = new Instant(300); + final Watermark watermark = new Watermark(1003); + final Instant ts4 = new Instant(1200); + final Watermark watermark2 = new Watermark(1400); + final Instant ts5 = new Instant(1600); + final Instant ts6 = new Instant(1800); + final Instant ts7 = new Instant(1900); + final Watermark watermark3 = new Watermark(2100); + + + final Transform.Context context = mock(Transform.Context.class); + final TestOutputCollector<Integer> oc = new TestOutputCollector(); + viewTransform.prepare(context, oc); + + viewTransform.onData(WindowedValue.of( + KV.of(null, "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING)); + + viewTransform.onData(WindowedValue.of( + KV.of(null, "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING)); + + viewTransform.onData(WindowedValue.of( + KV.of(null, "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING)); + + viewTransform.onWatermark(watermark); + + // materialized data + assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)), oc.outputs.get(0).getWindows()); + assertEquals(new Integer(3), oc.outputs.get(0).getValue()); + + // check output watermark + assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(), + oc.watermarks.get(0).getTimestamp()); + + oc.outputs.clear(); + oc.watermarks.clear(); + + + viewTransform.onData(WindowedValue.of( + KV.of(null, "a"), ts4, fixedwindows.assignWindow(ts4), PaneInfo.NO_FIRING)); + + // do not emit anything + viewTransform.onWatermark(watermark2); + assertEquals(0, oc.outputs.size()); + assertEquals(0, oc.watermarks.size()); + + viewTransform.onData(WindowedValue.of( + KV.of(null, "a"), ts5, fixedwindows.assignWindow(ts5), PaneInfo.NO_FIRING)); + + viewTransform.onData(WindowedValue.of( + KV.of(null, "a"), ts6, fixedwindows.assignWindow(ts6), PaneInfo.NO_FIRING)); + + viewTransform.onData(WindowedValue.of( + KV.of(null, "b"), ts7, fixedwindows.assignWindow(ts7), PaneInfo.NO_FIRING)); + + // emit windowed value + viewTransform.onWatermark(watermark3); + + // materialized data + assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), oc.outputs.get(0).getWindows()); + assertEquals(new Integer(4), oc.outputs.get(0).getValue()); + + // check output watermark + assertEquals(fixedwindows.assignWindow(ts4).maxTimestamp().getMillis(), + oc.watermarks.get(0).getTimestamp()); + + oc.outputs.clear(); + + viewTransform.close(); + assertEquals(0, oc.outputs.size()); + } + + final class SumViewFn extends ViewFn<Materializations.MultimapView<Void, String>, Integer> { + + @Override + public Materialization<Materializations.MultimapView<Void, String>> getMaterialization() { + throw new UnsupportedOperationException(); + } + + @Override + public Integer apply(final Materializations.MultimapView<Void, String> view) { + int sum = 0; + // MultimapView.get is Nullable + for (String s : view.get(null)) { + sum += 1; + } + return sum; + } + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services