This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch WAYANG-34
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 3c3a82e617e74d96df2cf7d306e86257ef3d23ed
Author: Bertty Contreras-Rojas <[email protected]>
AuthorDate: Mon Sep 27 14:13:02 2021 +0200

    [WAYANG-34] add Terasort just TeraSort.scala working
    
    Signed-off-by: bertty <[email protected]>
---
 .../org/apache/wayang/apps/terasort/TeraApp.scala  |  2 +-
 .../org/apache/wayang/apps/terasort/TeraSort.scala | 27 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala 
b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala
index f86a86e..fa8f9d4 100644
--- 
a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala
+++ 
b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala
@@ -47,7 +47,7 @@ object TeraApp extends ExperimentDescriptor {
 
     task match {
       case "generate" => new TeraGen(plugins: _*).apply(output_file, fileSize, 
partitions)
-      case "sort" => null
+      case "sort" => new TeraSort(plugins: _*).apply(input_file, output_file)
       case "validate" => null
     }
 
diff --git 
a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraSort.scala
 
b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraSort.scala
new file mode 100644
index 0000000..0169ec6
--- /dev/null
+++ 
b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraSort.scala
@@ -0,0 +1,27 @@
+package org.apache.wayang.apps.terasort
+
+import com.google.common.primitives.Longs
+import org.apache.wayang.api.PlanBuilder
+import org.apache.wayang.commons.util.profiledb.model.Experiment
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.core.plugin.Plugin
+
+class TeraSort(@transient plugins: Plugin*) extends Serializable {
+
+  def apply(input_url: String, output_url: String)
+           (implicit configuration: Configuration, experiment: Experiment) = {
+
+    val wayangCtx = new WayangContext(configuration)
+    plugins.foreach(wayangCtx.register)
+    val planBuilder = new PlanBuilder(wayangCtx)
+
+    planBuilder
+      .readObjectFile[Tuple2[Array[Byte], Array[Byte]]](input_url)
+      .sort(t => {
+        val bytes = t._1;
+        Longs.fromBytes(0, bytes(0), bytes(1), bytes(2), bytes(3), bytes(4), 
bytes(5), bytes(6))
+      })
+      .writeObjectFile(output_url);
+  }
+
+}

Reply via email to