taegeonum closed pull request #154: [NEMO-272] Fix incorrect uses of Beam 
FinishBundle
URL: https://github.com/apache/incubator-nemo/pull/154
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
index 4e3d49c9a..2ab09a7f3 100644
--- 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
+++ 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
@@ -132,14 +132,14 @@ private static boolean isHDFSPath(final String path) {
   }
 
   /**
-   * Start bundle.
-   * The number of output files are determined according to the parallelism.
+   * Writes to exactly one file.
+   * (The number of total output files are determined according to the 
parallelism.)
    * i.e. if parallelism is 2, then there are total 2 output files.
-   * Each output file is written as a bundle.
-   * @param c      bundle context {@link StartBundleContext}
    */
-  @StartBundle
-  public void startBundle(final StartBundleContext c) {
+  @Setup
+  public void setup() {
+    // Creating a side-effect in Setup is discouraged, but we do it anyways 
for now as we're extending DoFn.
+    // TODO #273: Our custom HDFSWrite should implement WriteOperation
     fileName = new Path(path + UUID.randomUUID().toString());
     try {
       fileSystem = fileName.getFileSystem(new JobConf());
@@ -166,12 +166,11 @@ public void processElement(final ProcessContext c) throws 
Exception {
   }
 
   /**
-   * finish bundle.
-   * @param c             context
+   * Teardown.
    * @throws IOException  output stream exception
    */
-  @FinishBundle
-  public void finishBundle(final FinishBundleContext c) throws IOException {
+  @Teardown
+  public void tearDown() throws IOException {
     outputStream.close();
   }
 }
diff --git 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
index f0f30f6da..921b86252 100644
--- 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
+++ 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
@@ -18,8 +18,6 @@
  */
 package org.apache.nemo.examples.beam;
 
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
 import org.apache.nemo.common.Pair;
@@ -61,6 +59,9 @@ private MultinomialLogisticRegression() {
     private final PCollectionView<Map<Integer, List<Double>>> modelView;
     private Map<Integer, List<Double>> model;
 
+    // TODO #274: Use bundles properly in Beam MultinomialLogisticRegression
+    private ProcessContext savedContextHack;
+
     /**
      * Constructor for CalculateGradient DoFn class.
      * @param modelView PCollectionView of the model.
@@ -124,6 +125,9 @@ private void initializeGradients() {
      */
     @ProcessElement
     public void processElement(final ProcessContext c) throws Exception {
+      // TODO #274: Use bundles properly in Beam MultinomialLogisticRegression
+      savedContextHack = c;
+
       final KV<Integer, Pair<List<Integer>, List<Double>>> data = 
parseLine(c.element());
       if (data == null) { // comments and newlines
         return;
@@ -224,15 +228,15 @@ public void processElement(final ProcessContext c) throws 
Exception {
     }
 
     /**
-     * FinishBundle method for BEAM.
-     * @param c Context.
+     * Teardown, since this logic at the moment should be executed exactly 
once after consuming the final data element.
+     * TODO #274: Use bundles properly in Beam MultinomialLogisticRegression
      */
-    @FinishBundle
-    public void finishBundle(final FinishBundleContext c) {
+    @Teardown
+    public void tearDown() {
       for (Integer i = 0; i < gradients.size(); i++) {
         // this enforces a global window (batching),
         // where all data elements of the corresponding PCollection are 
grouped and emitted downstream together
-        c.output(KV.of(i, gradients.get(i)), 
BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
+        savedContextHack.output(KV.of(i, gradients.get(i)));
       }
       LOG.info("stats: " + gradients.get(numClasses - 1).toString());
     }
@@ -302,13 +306,6 @@ public void processElement(final ProcessContext c) throws 
Exception {
         c.output(KV.of(kv.getKey(), ret));
       }
     }
-
-    /**
-     * FinishBundle method for BEAM.
-     */
-    @FinishBundle
-    public void finishBundle() {
-    }
   }
 
   /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to