This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch WAYANG-34 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 3c3a82e617e74d96df2cf7d306e86257ef3d23ed Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Mon Sep 27 14:13:02 2021 +0200 [WAYANG-34] add Terasort just TeraSort.scala working Signed-off-by: bertty <[email protected]> --- .../org/apache/wayang/apps/terasort/TeraApp.scala | 2 +- .../org/apache/wayang/apps/terasort/TeraSort.scala | 27 ++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala index f86a86e..fa8f9d4 100644 --- a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala +++ b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala @@ -47,7 +47,7 @@ object TeraApp extends ExperimentDescriptor { task match { case "generate" => new TeraGen(plugins: _*).apply(output_file, fileSize, partitions) - case "sort" => null + case "sort" => new TeraSort(plugins: _*).apply(input_file, output_file) case "validate" => null } diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraSort.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraSort.scala new file mode 100644 index 0000000..0169ec6 --- /dev/null +++ b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraSort.scala @@ -0,0 +1,27 @@ +package org.apache.wayang.apps.terasort + +import com.google.common.primitives.Longs +import org.apache.wayang.api.PlanBuilder +import org.apache.wayang.commons.util.profiledb.model.Experiment +import org.apache.wayang.core.api.{Configuration, WayangContext} +import org.apache.wayang.core.plugin.Plugin + +class TeraSort(@transient plugins: Plugin*) extends Serializable { + + def apply(input_url: String, output_url: String) + (implicit configuration: Configuration, experiment: Experiment) = { + + val wayangCtx = new WayangContext(configuration) + plugins.foreach(wayangCtx.register) + val planBuilder = new PlanBuilder(wayangCtx) + + planBuilder + .readObjectFile[Tuple2[Array[Byte], Array[Byte]]](input_url) + .sort(t => { + val bytes = t._1; + Longs.fromBytes(0, bytes(0), bytes(1), bytes(2), bytes(3), bytes(4), bytes(5), bytes(6)) + }) + .writeObjectFile(output_url); + } + +}
