This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch WAYANG-34 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 7d3f7edf814a13b9e0c4074a17225a4c66f6855d Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Mon Sep 27 14:38:28 2021 +0200 [WAYANG-34] add Terasort just TeraValidate.scala working Signed-off-by: bertty <[email protected]> --- .../apache/wayang/apps/terasort/Unsigned16.java | 3 +- .../org/apache/wayang/apps/terasort/TeraApp.scala | 4 +- .../apache/wayang/apps/terasort/TeraValidate.scala | 82 ++++++++++++++++++++++ 3 files changed, 86 insertions(+), 3 deletions(-) diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java index ae3b99e..57516b1 100644 --- a/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java +++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java @@ -22,6 +22,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.Serializable; import org.apache.hadoop.io.Writable; /** @@ -30,7 +31,7 @@ import org.apache.hadoop.io.Writable; * * * code copied from <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/Unsigned16.java">Terasort Example</a> */ -class Unsigned16 implements Writable { +class Unsigned16 implements Writable, Serializable { private long hi8; private long lo8; diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala index fa8f9d4..5023ec9 100644 --- a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala +++ b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala @@ -41,14 +41,14 @@ object TeraApp extends ExperimentDescriptor { val partitions = args(4).toInt experiment.getSubject.addConfiguration("partitions", partitions) val input_file = if(args(5).equals("null")) null else args(5) - val output_file = if(args.length >= 5){ if(args(6).equals("null")) null else args(6) } else null + val output_file = if(args.length > 6){ if(args(6).equals("null")) null else args(6) } else null experiment.getSubject.addConfiguration("inputFile", input_file) experiment.getSubject.addConfiguration("outputFile", output_file) task match { case "generate" => new TeraGen(plugins: _*).apply(output_file, fileSize, partitions) case "sort" => new TeraSort(plugins: _*).apply(input_file, output_file) - case "validate" => null + case "validate" => new TeraValidate(plugins: _*).apply(input_file) } diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraValidate.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraValidate.scala new file mode 100644 index 0000000..829a0ba --- /dev/null +++ b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraValidate.scala @@ -0,0 +1,82 @@ +package org.apache.wayang.apps.terasort + +import org.apache.wayang.api.PlanBuilder +import org.apache.wayang.commons.util.profiledb.model.Experiment +import org.apache.wayang.core.api.{Configuration, WayangContext} +import org.apache.wayang.core.plugin.Plugin +import org.apache.hadoop.util.PureJavaCrc32 +import com.google.common.primitives.UnsignedBytes + +class TeraValidate(@transient plugins: Plugin*) extends Serializable { + + def apply(input_url: String) + (implicit configuration: Configuration, experiment: Experiment) = { + + val wayangCtx = new WayangContext(configuration) + plugins.foreach(wayangCtx.register) + val planBuilder = new PlanBuilder(wayangCtx) + val dataset = planBuilder + .readObjectFile[Tuple2[Array[Byte], Array[Byte]]](input_url) + + val output = dataset.mapPartitions( + iterable_element => { + val iter = iterable_element.iterator + val sum = new Unsigned16 + val checksum = new Unsigned16 + val crc32 = new PureJavaCrc32() + val min = new Array[Byte](10) + val max = new Array[Byte](10) + + val cmp = UnsignedBytes.lexicographicalComparator() + + var pos = 0L + var prev = new Array[Byte](10) + + while (iter.hasNext) { + val key = iter.next()._1 + assert(cmp.compare(key, prev) >= 0) + + crc32.reset() + crc32.update(key, 0, key.length) + checksum.set(crc32.getValue) + sum.add(checksum) + + if (pos == 0) { + key.copyToArray(min, 0, 10) + } + pos += 1 + prev = key + } + prev.copyToArray(max, 0, 10) + Iterator((sum, min, max)).toStream + } + ) + + val checksumOutput = output.collect() + val cmp = UnsignedBytes.lexicographicalComparator() + val sum = new Unsigned16 + var numRecords = dataset.count.collect().head + + checksumOutput.foreach { case (partSum, min, max) => + sum.add(partSum) + } + println("num records: " + numRecords) + println("checksum: " + sum.toString) + var lastMax = new Array[Byte](10) + checksumOutput.map{ case (partSum, min, max) => + (partSum, min.clone(), max.clone()) + }.zipWithIndex.foreach { case ((partSum, min, max), i) => + println(s"part $i") + println(s"lastMax" + lastMax.toSeq.map(x => if (x < 0) 256 + x else x)) + println(s"min " + min.toSeq.map(x => if (x < 0) 256 + x else x)) + println(s"max " + max.toSeq.map(x => if (x < 0) 256 + x else x)) + assert(cmp.compare(min, max) <= 0, "min >= max") + assert(cmp.compare(lastMax, min) <= 0, "current partition min < last partition max") + lastMax = max + } + println("num records: " + numRecords) + println("checksum: " + sum.toString) + println("partitions are properly sorted") + } + +}
