johnyangk closed pull request #152: [NEMO-268] Consider start/finish bundles in 
Transform
URL: https://github.com/apache/incubator-nemo/pull/152
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineOptions.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineOptions.java
index c07ae4856..1dd3f06e3 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineOptions.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineOptions.java
@@ -18,10 +18,23 @@
  */
 package org.apache.nemo.compiler.frontend.beam;
 
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
  * NemoPipelineOptions.
  */
 public interface NemoPipelineOptions extends PipelineOptions {
+  @Description("The maximum number of elements in a bundle.")
+  @Default.Long(1000)
+  Long getMaxBundleSize();
+
+  void setMaxBundleSize(Long size);
+
+  @Description("The maximum time to wait before finalising a bundle (in 
milliseconds).")
+  @Default.Long(1000)
+  Long getMaxBundleTimeMills();
+
+  void setMaxBundleTimeMills(Long time);
 }
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 92a1f1696..3585ea2fd 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -32,6 +32,8 @@
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.List;
@@ -46,6 +48,7 @@
  */
 public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
   Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractDoFnTransform.class.getName());
 
   private final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> additionalOutputTags;
@@ -62,6 +65,16 @@
   private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
   private transient DoFnRunners.OutputManager outputManager;
 
+  // For bundle
+  // we consider count and time millis for start/finish bundle
+  // if # of processed elements > bundleSize
+  // or elapsed time > bundleMillis, we finish the current bundle and start a 
new one
+  private transient long bundleSize;
+  private transient long bundleMillis;
+  private long prevBundleStartTime;
+  private long currBundleCount = 0;
+  private boolean bundleFinished = true;
+
   /**
    * AbstractDoFnTransform constructor.
    * @param doFn doFn
@@ -115,12 +128,42 @@ public final DoFn getDoFn() {
     return doFn;
   }
 
+  /**
+   * Checks whether the bundle is finished or not.
+   * Starts the bundle if it is done.
+   */
+  protected final void checkAndInvokeBundle() {
+    if (bundleFinished) {
+      bundleFinished = false;
+      doFnRunner.startBundle();
+      prevBundleStartTime = System.currentTimeMillis();
+      currBundleCount = 0;
+    }
+    currBundleCount += 1;
+  }
+
+
+  /**
+   * Checks whether it is time to finish the bundle and finish it.
+   */
+  protected final void checkAndFinishBundle() {
+    if (!bundleFinished) {
+      if (currBundleCount >= bundleSize || System.currentTimeMillis() - 
prevBundleStartTime >= bundleMillis) {
+        bundleFinished = true;
+        doFnRunner.finishBundle();
+      }
+    }
+  }
+
   @Override
   public final void prepare(final Context context, final 
OutputCollector<WindowedValue<OutputT>> oc) {
     // deserialize pipeline option
     final NemoPipelineOptions options = 
serializedOptions.get().as(NemoPipelineOptions.class);
     this.outputCollector = oc;
 
+    this.bundleMillis = options.getMaxBundleTimeMills();
+    this.bundleSize = options.getMaxBundleSize();
+
     // create output manager
     outputManager = new DefaultOutputManager<>(outputCollector, mainOutputTag);
 
@@ -162,8 +205,6 @@ public TimerInternals timerInternals() {
       inputCoder,
       outputCoders,
       windowingStrategy);
-
-    doFnRunner.startBundle();
   }
 
   public final OutputCollector<WindowedValue<OutputT>> getOutputCollector() {
@@ -173,7 +214,9 @@ public TimerInternals timerInternals() {
   @Override
   public final void close() {
     beforeClose();
-    doFnRunner.finishBundle();
+    if (!bundleFinished) {
+      doFnRunner.finishBundle();
+    }
     doFnInvoker.invokeTeardown();
   }
 
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 4a57ada90..433f9df1f 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -64,14 +64,18 @@ protected DoFn wrapDoFn(final DoFn initDoFn) {
 
   @Override
   public void onData(final WindowedValue<InputT> data) {
+    checkAndInvokeBundle();
     getDoFnRunner().processElement(data);
+    checkAndFinishBundle();
   }
 
   @Override
   public void onWatermark(final Watermark watermark) {
+    checkAndInvokeBundle();
     // TODO #216: We should consider push-back data that waits for side input
     // TODO #216: If there are push-back data, input watermark >= output 
watermark
     getOutputCollector().emitWatermark(watermark);
+    checkAndFinishBundle();
   }
 
   @Override
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 7d20f2653..372784629 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -103,6 +103,8 @@ protected DoFn wrapDoFn(final DoFn doFn) {
    */
   @Override
   public void onData(final WindowedValue<KV<K, InputT>> element) {
+    checkAndInvokeBundle();
+
     // We can call Beam's DoFnRunner#processElement here,
     // but it may generate some overheads if we call the method for each data.
     // The `processElement` requires a `Iterator` of data, so we emit the 
buffered data every watermark.
@@ -111,6 +113,8 @@ public void onData(final WindowedValue<KV<K, InputT>> 
element) {
     final KV<K, InputT> kv = element.getValue();
     keyToValues.putIfAbsent(kv.getKey(), new ArrayList<>());
     keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
+
+    checkAndFinishBundle();
   }
 
   /**
@@ -158,7 +162,9 @@ private void processElementsAndTriggerTimers(final 
Watermark inputWatermark,
 
   @Override
   public void onWatermark(final Watermark inputWatermark) {
+    checkAndInvokeBundle();
     processElementsAndTriggerTimers(inputWatermark, Instant.now(), 
Instant.now());
+    checkAndFinishBundle();
   }
 
   /**
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index b2fbaeb15..bad758485 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -20,12 +20,14 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -90,6 +92,98 @@ public void testSingleOutput() {
     doFnTransform.close();
   }
 
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCountBundle() {
+
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+    final NemoPipelineOptions pipelineOptions = 
PipelineOptionsFactory.as(NemoPipelineOptions.class);
+    pipelineOptions.setMaxBundleSize(3L);
+    pipelineOptions.setMaxBundleTimeMills(10000000L);
+
+    final List<Integer> bundleOutput = new ArrayList<>();
+
+    final DoFnTransform<String, String> doFnTransform =
+      new DoFnTransform<>(
+        new BundleTestDoFn(bundleOutput),
+        NULL_INPUT_CODER,
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        Collections.emptyList(),
+        WindowingStrategy.globalDefault(),
+        emptyList(), /* side inputs */
+        pipelineOptions);
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final OutputCollector<WindowedValue<String>> oc = new 
TestOutputCollector<>();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+
+    assertEquals(3, (int) bundleOutput.get(0));
+
+    bundleOutput.clear();
+
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+
+    assertEquals(3, (int) bundleOutput.get(0));
+
+    doFnTransform.close();
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testTimeBundle() {
+
+    final long maxBundleTimeMills = 1000L;
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+    final NemoPipelineOptions pipelineOptions = 
PipelineOptionsFactory.as(NemoPipelineOptions.class);
+    pipelineOptions.setMaxBundleSize(10000000L);
+    pipelineOptions.setMaxBundleTimeMills(maxBundleTimeMills);
+
+    final List<Integer> bundleOutput = new ArrayList<>();
+
+    final DoFnTransform<String, String> doFnTransform =
+      new DoFnTransform<>(
+        new BundleTestDoFn(bundleOutput),
+        NULL_INPUT_CODER,
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        Collections.emptyList(),
+        WindowingStrategy.globalDefault(),
+        emptyList(), /* side inputs */
+        pipelineOptions);
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final OutputCollector<WindowedValue<String>> oc = new 
TestOutputCollector<>();
+
+    long startTime = System.currentTimeMillis();
+    doFnTransform.prepare(context, oc);
+
+    int count = 0;
+    while (bundleOutput.isEmpty()) {
+      doFnTransform.onData(WindowedValue.valueInGlobalWindow("a"));
+      count += 1;
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        throw new RuntimeException(e);
+      }
+    }
+
+    long endTime = System.currentTimeMillis();
+    assertEquals(count, (int) bundleOutput.get(0));
+    assertTrue(endTime - startTime >= maxBundleTimeMills);
+
+    doFnTransform.close();
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testMultiOutputOutput() {
@@ -194,11 +288,42 @@ public void testSideInputs() {
     doFnTransform.close();
   }
 
+
+  /**
+   * Bundle test do fn.
+   */
+  private static class BundleTestDoFn extends DoFn<String, String> {
+    int count;
+
+    private final List<Integer> bundleOutput;
+
+    BundleTestDoFn(final List<Integer> bundleOutput) {
+      this.bundleOutput = bundleOutput;
+    }
+
+    @ProcessElement
+    public void processElement(final ProcessContext c) throws Exception {
+      count += 1;
+      c.output(c.element());
+    }
+
+    @StartBundle
+    public void startBundle(final StartBundleContext c) {
+      count = 0;
+    }
+
+    @FinishBundle
+    public void finishBundle(final FinishBundleContext c) {
+      bundleOutput.add(count);
+    }
+  }
+
   /**
    * Identitiy do fn.
    * @param <T> type
    */
   private static class IdentityDoFn<T> extends DoFn<T, T> {
+
     @ProcessElement
     public void processElement(final ProcessContext c) throws Exception {
       c.output(c.element());
diff --git 
a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
 
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
index 57303fc2f..55ed19dcb 100644
--- 
a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
+++ 
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -34,7 +34,7 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 public final class WindowedWordCountITCase {
-  
+
   private static final int TIMEOUT = 120000;
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + 
"/../resources/";


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to