taegeonum closed pull request #154: [NEMO-272] Fix incorrect uses of Beam
FinishBundle
URL: https://github.com/apache/incubator-nemo/pull/154
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
index 4e3d49c9a..2ab09a7f3 100644
---
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
@@ -132,14 +132,14 @@ private static boolean isHDFSPath(final String path) {
}
/**
- * Start bundle.
- * The number of output files are determined according to the parallelism.
+ * Writes to exactly one file.
+ * (The number of total output files are determined according to the
parallelism.)
* i.e. if parallelism is 2, then there are total 2 output files.
- * Each output file is written as a bundle.
- * @param c bundle context {@link StartBundleContext}
*/
- @StartBundle
- public void startBundle(final StartBundleContext c) {
+ @Setup
+ public void setup() {
+ // Creating a side-effect in Setup is discouraged, but we do it anyways
for now as we're extending DoFn.
+ // TODO #273: Our custom HDFSWrite should implement WriteOperation
fileName = new Path(path + UUID.randomUUID().toString());
try {
fileSystem = fileName.getFileSystem(new JobConf());
@@ -166,12 +166,11 @@ public void processElement(final ProcessContext c) throws
Exception {
}
/**
- * finish bundle.
- * @param c context
+ * Teardown.
* @throws IOException output stream exception
*/
- @FinishBundle
- public void finishBundle(final FinishBundleContext c) throws IOException {
+ @Teardown
+ public void tearDown() throws IOException {
outputStream.close();
}
}
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
index f0f30f6da..921b86252 100644
---
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
@@ -18,8 +18,6 @@
*/
package org.apache.nemo.examples.beam;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
import org.apache.nemo.common.Pair;
@@ -61,6 +59,9 @@ private MultinomialLogisticRegression() {
private final PCollectionView<Map<Integer, List<Double>>> modelView;
private Map<Integer, List<Double>> model;
+ // TODO #274: Use bundles properly in Beam MultinomialLogisticRegression
+ private ProcessContext savedContextHack;
+
/**
* Constructor for CalculateGradient DoFn class.
* @param modelView PCollectionView of the model.
@@ -124,6 +125,9 @@ private void initializeGradients() {
*/
@ProcessElement
public void processElement(final ProcessContext c) throws Exception {
+ // TODO #274: Use bundles properly in Beam MultinomialLogisticRegression
+ savedContextHack = c;
+
final KV<Integer, Pair<List<Integer>, List<Double>>> data =
parseLine(c.element());
if (data == null) { // comments and newlines
return;
@@ -224,15 +228,15 @@ public void processElement(final ProcessContext c) throws
Exception {
}
/**
- * FinishBundle method for BEAM.
- * @param c Context.
+ * Teardown, since this logic at the moment should be executed exactly
once after consuming the final data element.
+ * TODO #274: Use bundles properly in Beam MultinomialLogisticRegression
*/
- @FinishBundle
- public void finishBundle(final FinishBundleContext c) {
+ @Teardown
+ public void tearDown() {
for (Integer i = 0; i < gradients.size(); i++) {
// this enforces a global window (batching),
// where all data elements of the corresponding PCollection are
grouped and emitted downstream together
- c.output(KV.of(i, gradients.get(i)),
BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
+ savedContextHack.output(KV.of(i, gradients.get(i)));
}
LOG.info("stats: " + gradients.get(numClasses - 1).toString());
}
@@ -302,13 +306,6 @@ public void processElement(final ProcessContext c) throws
Exception {
c.output(KV.of(kv.getKey(), ret));
}
}
-
- /**
- * FinishBundle method for BEAM.
- */
- @FinishBundle
- public void finishBundle() {
- }
}
/**
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services