johnyangk closed pull request #90: [NEMO-81] Support the Beam 'Partition' 
transform
URL: https://github.com/apache/incubator-nemo/pull/90
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PerPercentileAverage.java
 
b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PerPercentileAverage.java
new file mode 100644
index 000000000..61c6b1ddc
--- /dev/null
+++ 
b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PerPercentileAverage.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam;
+
+import com.google.common.collect.Lists;
+import edu.snu.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import edu.snu.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Per percentile statistics application.
+ */
+public final class PerPercentileAverage {
+  /**
+   * Private Constructor.
+   */
+  private PerPercentileAverage() {
+  }
+
+  /**
+   * Main function for the MR BEAM program.
+   *
+   * @param args arguments.
+   */
+  public static void main(final String[] args) {
+    final String inputFilePath = args[0];
+    final String outputFilePath = args[1];
+    final PipelineOptions options = 
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+    options.setRunner(NemoPipelineRunner.class);
+    options.setJobName("PerPercentileAverage");
+
+    final Pipeline p = Pipeline.create(options);
+
+    PCollection<Student> students = GenericSourceSink.read(p, inputFilePath)
+        .apply(ParDo.of(new DoFn<String, Student>() {
+          @ProcessElement
+          public void processElement(final ProcessContext c) {
+            String[] line = c.element().split(" ");
+            c.output(new Student(Integer.parseInt(line[0]), 
Integer.parseInt(line[1]), Integer.parseInt(line[2])));
+          }
+        }))
+        .setCoder(SerializableCoder.of(Student.class));
+
+    PCollectionList<Student> studentsByPercentile =
+        // Make sure that each partition contain at least one element.
+        // If there are empty PCollections, successive WriteFiles may fail.
+        students.apply(Partition.of(10, new Partition.PartitionFn<Student>() {
+          public int partitionFor(final Student student, final int 
numPartitions) {
+            return student.getPercentile() / numPartitions;
+          }
+        }));
+
+    PCollection<String> [] results  = new PCollection[10];
+    for (int i = 0; i < 10; i++) {
+      results[i] = studentsByPercentile.get(i)
+          .apply(MapElements.via(new SimpleFunction<Student, KV<String, 
Integer>>() {
+            @Override
+            public KV<String, Integer> apply(final Student student) {
+              return KV.of("", student.getScore());
+            }
+          }))
+          .apply(GroupByKey.create())
+          .apply(MapElements.via(new SimpleFunction<KV<String, 
Iterable<Integer>>, String>() {
+            @Override
+            public String apply(final KV<String, Iterable<Integer>> kv) {
+              List<Integer> scores = Lists.newArrayList(kv.getValue());
+              final int sum = scores.stream().reduce(0, (Integer x, Integer y) 
-> x + y);
+              return scores.size() + " " + (double) sum / scores.size();
+            }
+          }));
+      GenericSourceSink.write(results[i], outputFilePath + "_" + i);
+    }
+
+    p.run();
+  }
+
+  /**
+   * Student Class.
+   */
+  static class Student implements Serializable {
+    private int id;
+    private int percentile;
+    private int score;
+
+    Student(final int id, final int percentile, final int score) {
+      this.id = id;
+      this.percentile = percentile;
+      this.score = score;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    public void setId(final int id) {
+      this.id = id;
+    }
+
+    public int getPercentile() {
+      return percentile;
+    }
+
+    public void setPercentile(final int percentile) {
+      this.percentile = percentile;
+    }
+
+    public int getScore() {
+      return score;
+    }
+
+    public void setScore(final int score) {
+      this.score = score;
+    }
+  }
+}
diff --git 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PerPercentileAverageITCase.java
 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PerPercentileAverageITCase.java
new file mode 100644
index 000000000..eaeeddc31
--- /dev/null
+++ 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PerPercentileAverageITCase.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam;
+
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.common.test.ArgBuilder;
+import edu.snu.nemo.common.test.ExampleTestUtil;
+import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test PerPercentile Average program with JobLauncher.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class PerPercentileAverageITCase {
+  private static final int TIMEOUT = 120000;
+  private static ArgBuilder builder;
+  private static final String fileBasePath = System.getProperty("user.dir") + 
"/../resources/";
+
+  private static final String inputFileName = "test_input_partition";
+  private static final String outputFileName = "test_output_partition";
+  private static final String expectedOutputFileName = 
"expected_output_partition";
+  private static final String executorResourceFileName = fileBasePath + 
"beam_test_executor_resources.json";
+  private static final String inputFilePath =  fileBasePath + inputFileName;
+  private static final String outputFilePath =  fileBasePath + outputFileName;
+
+  @Before
+  public void setUp() throws Exception {
+    builder = new ArgBuilder()
+      .addResourceJson(executorResourceFileName)
+      .addUserMain(PerPercentileAverage.class.getCanonicalName())
+      .addUserArgs(inputFilePath, outputFilePath);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      for (int i = 0; i < 10; i++) {
+        ExampleTestUtil.ensureOutputValidity(fileBasePath,
+            outputFileName + "_" + i,
+            expectedOutputFileName + "_" + i);
+      }
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
+
+  @Test (timeout = TIMEOUT)
+  public void test() throws Exception {
+    JobLauncher.main(builder
+      .addJobId(PerPercentileAverage.class.getSimpleName())
+      
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+      .build());
+  }
+}
diff --git a/examples/resources/expected_output_partition_0 
b/examples/resources/expected_output_partition_0
new file mode 100644
index 000000000..c03e1b065
--- /dev/null
+++ b/examples/resources/expected_output_partition_0
@@ -0,0 +1 @@
+3 13.0
diff --git a/examples/resources/expected_output_partition_1 
b/examples/resources/expected_output_partition_1
new file mode 100644
index 000000000..8bfe99f79
--- /dev/null
+++ b/examples/resources/expected_output_partition_1
@@ -0,0 +1 @@
+2 22.0
diff --git a/examples/resources/expected_output_partition_2 
b/examples/resources/expected_output_partition_2
new file mode 100644
index 000000000..6bd3675cd
--- /dev/null
+++ b/examples/resources/expected_output_partition_2
@@ -0,0 +1 @@
+2 30.0
diff --git a/examples/resources/expected_output_partition_3 
b/examples/resources/expected_output_partition_3
new file mode 100644
index 000000000..c3a9fe47a
--- /dev/null
+++ b/examples/resources/expected_output_partition_3
@@ -0,0 +1 @@
+3 36.333333333333336
diff --git a/examples/resources/expected_output_partition_4 
b/examples/resources/expected_output_partition_4
new file mode 100644
index 000000000..96f2bca89
--- /dev/null
+++ b/examples/resources/expected_output_partition_4
@@ -0,0 +1 @@
+3 46.333333333333336
diff --git a/examples/resources/expected_output_partition_5 
b/examples/resources/expected_output_partition_5
new file mode 100644
index 000000000..aed062c9b
--- /dev/null
+++ b/examples/resources/expected_output_partition_5
@@ -0,0 +1 @@
+2 62.0
diff --git a/examples/resources/expected_output_partition_6 
b/examples/resources/expected_output_partition_6
new file mode 100644
index 000000000..86bf440ce
--- /dev/null
+++ b/examples/resources/expected_output_partition_6
@@ -0,0 +1 @@
+4 67.25
diff --git a/examples/resources/expected_output_partition_7 
b/examples/resources/expected_output_partition_7
new file mode 100644
index 000000000..e69de29bb
diff --git a/examples/resources/expected_output_partition_8 
b/examples/resources/expected_output_partition_8
new file mode 100644
index 000000000..d8d34a895
--- /dev/null
+++ b/examples/resources/expected_output_partition_8
@@ -0,0 +1 @@
+4 89.75
diff --git a/examples/resources/expected_output_partition_9 
b/examples/resources/expected_output_partition_9
new file mode 100644
index 000000000..15efcb051
--- /dev/null
+++ b/examples/resources/expected_output_partition_9
@@ -0,0 +1 @@
+5 96.0
diff --git a/examples/resources/test_input_partition 
b/examples/resources/test_input_partition
new file mode 100644
index 000000000..95ee9dcd9
--- /dev/null
+++ b/examples/resources/test_input_partition
@@ -0,0 +1,28 @@
+1 14 23
+2 49 57
+3 23 29
+5 37 37
+6 41 39
+7 57 63
+8 66 69
+9 91 92
+10 93 93
+11 97 98
+12 96 97
+13 82 90
+14 88 91
+15 7 13
+16 11 21
+17 3 9
+18 81 88
+19 99 100
+20 62 65
+21 83 90
+22 26 31
+23 31 34
+24 39 38
+25 44 43
+26 50 61
+27 61 64
+28 67 71
+29 8 17
\ No newline at end of file
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 68ba2c111..91c80d0e7 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -15,6 +15,8 @@
  */
 package edu.snu.nemo.runtime.executor.task;
 
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
@@ -24,6 +26,7 @@
 import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -111,9 +114,17 @@ Object fetchDataElement() throws IOException {
         if (currentIteratorIndex == expectedNumOfIterators) {
           // Entire fetcher is done
           if (noElementAtAll) {
-            // This shouldn't normally happen, except for cases such as when 
Beam's VoidCoder is used.
-            noElementAtAll = false;
-            return Void.TYPE;
+            final Optional<DecoderFactory> decoderFactory =
+                
readersForParentTask.getRuntimeEdge().getPropertyValue(DecoderProperty.class);
+
+            // TODO #173: Properly handle zero-element task outputs. Currently 
fetchDataElement relies on
+            // toString() method to distinguish whether to return Void.TYPE or 
not.
+            if (decoderFactory.get().toString().equals("VoidCoder")) {
+              noElementAtAll = false;
+              return Void.TYPE;
+            } else {
+              return null;
+            }
           } else {
             // This whole fetcher's done
             return null;
diff --git 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index 931e7515f..3f1fd3d2c 100644
--- 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -15,7 +15,11 @@
  */
 package edu.snu.nemo.runtime.executor.task;
 
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
 import org.junit.Test;
@@ -31,6 +35,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockingDetails;
 import static org.mockito.Mockito.when;
 
 /**
@@ -41,10 +46,11 @@
 public final class ParentTaskDataFetcherTest {
 
   @Test(timeout=5000)
-  public void testEmpty() throws Exception {
-    // InputReader
+  public void testVoid() throws Exception {
+    // TODO #173: Properly handle zero-element. This test should be updated 
too.
     final List<String> dataElements = new ArrayList<>(0); // empty data
-    final InputReader inputReader = 
generateInputReader(generateCompletableFuture(dataElements.iterator()));
+    final InputReader inputReader = 
generateInputReaderWithCoder(generateCompletableFuture(dataElements.iterator()),
+        "VoidCoder");
 
     // Fetcher
     final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
@@ -53,6 +59,20 @@ public void testEmpty() throws Exception {
     assertEquals(Void.TYPE, fetcher.fetchDataElement());
   }
 
+  @Test(timeout=5000)
+  public void testEmpty() throws Exception {
+    // TODO #173: Properly handle zero-element. This test should be updated 
too.
+    final List<String> dataElements = new ArrayList<>(0); // empty data
+    final InputReader inputReader = 
generateInputReaderWithCoder(generateCompletableFuture(dataElements.iterator()),
+        "IntCoder");
+
+    // Fetcher
+    final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
+
+    // Should return Void.TYPE
+    assertEquals(null, fetcher.fetchDataElement());
+  }
+
   @Test(timeout=5000)
   public void testNonEmpty() throws Exception {
     // InputReader
@@ -111,6 +131,28 @@ private ParentTaskDataFetcher createFetcher(final 
InputReader readerForParentTas
         false);
   }
 
+
+  private DecoderFactory generateCoder(final String coder) {
+    final DecoderFactory decoderFactory = mock(DecoderFactory.class);
+    when(decoderFactory.toString()).thenReturn(coder);
+    return decoderFactory;
+  }
+
+  private RuntimeEdge generateEdge(final String coder) {
+    final String runtimeIREdgeId = "Runtime edge with coder";
+    final ExecutionPropertyMap edgeProperties = new 
ExecutionPropertyMap(runtimeIREdgeId);
+    edgeProperties.put(DecoderProperty.of(generateCoder(coder)));
+    return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, 
mock(IRVertex.class), mock(IRVertex.class), false);
+  }
+
+  private InputReader generateInputReaderWithCoder(final CompletableFuture 
completableFuture, final String coder) {
+    final InputReader inputReader = mock(InputReader.class);
+    when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));
+    final RuntimeEdge runtimeEdge = generateEdge(coder);
+    when(inputReader.getRuntimeEdge()).thenReturn(runtimeEdge);
+    return inputReader;
+  }
+
   private InputReader generateInputReader(final CompletableFuture 
completableFuture) {
     final InputReader inputReader = mock(InputReader.class);
     when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to