This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch WAYANG-34 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 311a1e03466b1c46eeda49774109159a266c4c32 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Mon Sep 27 14:00:44 2021 +0200 [WAYANG-34] add Terasort just TeraGen running Signed-off-by: bertty <[email protected]> --- wayang-benchmark/pom.xml | 15 + .../org/apache/wayang/apps/terasort/Random16.java | 376 +++++++++++++++++++++ .../apache/wayang/apps/terasort/Unsigned16.java | 299 ++++++++++++++++ .../org/apache/wayang/apps/terasort/TeraApp.scala | 138 ++++++++ .../org/apache/wayang/apps/terasort/TeraGen.scala | 111 ++++++ 5 files changed, 939 insertions(+) diff --git a/wayang-benchmark/pom.xml b/wayang-benchmark/pom.xml index f230ee0..a31edb1 100644 --- a/wayang-benchmark/pom.xml +++ b/wayang-benchmark/pom.xml @@ -59,6 +59,21 @@ <artifactId>wayang-sqlite3</artifactId> <version>0.6.1-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>2.7.7</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs-client</artifactId> + <version>2.7.7</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.mayor.version}</artifactId> + <version>${spark.version}</version> + </dependency> </dependencies> <modules> diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Random16.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Random16.java new file mode 100644 index 0000000..6e85d92 --- /dev/null +++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Random16.java @@ -0,0 +1,376 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.apps.terasort; + +/** + * This class implements a 128-bit linear congruential generator. + * Specifically, if X0 is the most recently issued 128-bit random + * number (or a seed of 0 if no random number has already been generated, + * the next number to be generated, X1, is equal to: + * X1 = (a * X0 + c) mod 2**128 + * where a is 47026247687942121848144207491837523525 + * or 0x2360ed051fc65da44385df649fccf645 + * and c is 98910279301475397889117759788405497857 + * or 0x4a696d47726179524950202020202001 + * The coefficient "a" is suggested by: + * Pierre L'Ecuyer, "Tables of linear congruential generators of different + * sizes and good lattice structure", Mathematics of Computation, 68 + * pp. 249 - 260 (1999) + * http://www.ams.org/mcom/1999-68-225/S0025-5718-99-00996-5/S0025-5718-99-00996-5.pdf + * The constant "c" meets the simple suggestion by the same reference that + * it be odd. + * + * There is also a facility for quickly advancing the state of the + * generator by a fixed number of steps - this facilitates parallel + * generation. + * + * This is based on 1.0 of rand16.c from Chris Nyberg + * <[email protected]>. + * + * code copied from <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/Random16.java">Terasort Example</a> + */ +class Random16 { + + /** + * The "Gen" array contain powers of 2 of the linear congruential generator. + * The index 0 struct contain the "a" coefficient and "c" constant for the + * generator. That is, the generator is: + * f(x) = (Gen[0].a * x + Gen[0].c) mod 2**128 + * + * All structs after the first contain an "a" and "c" that + * comprise the square of the previous function. + * + * f**2(x) = (Gen[1].a * x + Gen[1].c) mod 2**128 + * f**4(x) = (Gen[2].a * x + Gen[2].c) mod 2**128 + * f**8(x) = (Gen[3].a * x + Gen[3].c) mod 2**128 + * ... + + */ + private static class RandomConstant { + final Unsigned16 a; + final Unsigned16 c; + public RandomConstant(String left, String right) { + a = new Unsigned16(left); + c = new Unsigned16(right); + } + } + + private static final RandomConstant[] genArray = new RandomConstant[]{ + /* [ 0] */ new RandomConstant("2360ed051fc65da44385df649fccf645", + "4a696d47726179524950202020202001"), + /* [ 1] */ new RandomConstant("17bce35bdf69743c529ed9eb20e0ae99", + "95e0e48262b3edfe04479485c755b646"), + /* [ 2] */ new RandomConstant("f4dd417327db7a9bd194dfbe42d45771", + "882a02c315362b60765f100068b33a1c"), + /* [ 3] */ new RandomConstant("6347af777a7898f6d1a2d6f33505ffe1", + "5efc4abfaca23e8ca8edb1f2dfbf6478"), + /* [ 4] */ new RandomConstant("b6a4239f3b315f84f6ef6d3d288c03c1", + "f25bd15439d16af594c1b1bafa6239f0"), + /* [ 5] */ new RandomConstant("2c82901ad1cb0cd182b631ba6b261781", + "89ca67c29c9397d59c612596145db7e0"), + /* [ 6] */ new RandomConstant("dab03f988288676ee49e66c4d2746f01", + "8b6ae036713bd578a8093c8eae5c7fc0"), + /* [ 7] */ new RandomConstant("602167331d86cf5684fe009a6d09de01", + "98a2542fd23d0dbdff3b886cdb1d3f80"), + /* [ 8] */ new RandomConstant("61ecb5c24d95b058f04c80a23697bc01", + "954db923fdb7933e947cd1edcecb7f00"), + /* [ 9] */ new RandomConstant("4a5c31e0654c28aa60474e83bf3f7801", + "00be4a36657c98cd204e8c8af7dafe00"), + /* [ 10] */ new RandomConstant("ae4f079d54fbece1478331d3c6bef001", + "991965329dccb28d581199ab18c5fc00"), + /* [ 11] */ new RandomConstant("101b8cb830c7cb927ff1ed50ae7de001", + "e1a8705b63ad5b8cd6c3d268d5cbf800"), + /* [ 12] */ new RandomConstant("f54a27fc056b00e7563f3505e0fbc001", + "2b657bbfd6ed9d632079e70c3c97f000"), + /* [ 13] */ new RandomConstant("df8a6fc1a833d201f98d719dd1f78001", + "59b60ee4c52fa49e9fe90682bd2fe000"), + /* [ 14] */ new RandomConstant("5480a5015f101a4ea7e3f183e3ef0001", + "cc099c88030679464fe86aae8a5fc000"), + /* [ 15] */ new RandomConstant("a498509e76e5d7925f539c28c7de0001", + "06b9abff9f9f33dd30362c0154bf8000"), + /* [ 16] */ new RandomConstant("0798a3d8b10dc72e60121cd58fbc0001", + "e296707121688d5a0260b293a97f0000"), + /* [ 17] */ new RandomConstant("1647d1e78ec02e665fafcbbb1f780001", + "189ffc4701ff23cb8f8acf6b52fe0000"), + /* [ 18] */ new RandomConstant("a7c982285e72bf8c0c8ddfb63ef00001", + "5141110ab208fb9d61fb47e6a5fc0000"), + /* [ 19] */ new RandomConstant("3eb78ee8fb8c56dbc5d4e06c7de00001", + "3c97caa62540f2948d8d340d4bf80000"), + /* [ 20] */ new RandomConstant("72d03b6f4681f2f9fe8e44d8fbc00001", + "1b25cb9cfe5a0c963174f91a97f00000"), + /* [ 21] */ new RandomConstant("ea85f81e4f502c9bc8ae99b1f7800001", + "0c644570b4a487103c5436352fe00000"), + /* [ 22] */ new RandomConstant("629c320db08b00c6bfa57363ef000001", + "3d0589c28869472bde517c6a5fc00000"), + /* [ 23] */ new RandomConstant("c5c4b9ce268d074a386be6c7de000001", + "bc95e5ab36477e65534738d4bf800000"), + /* [ 24] */ new RandomConstant("f30bbbbed1596187555bcd8fbc000001", + "ddb02ff72a031c01011f71a97f000000"), + /* [ 25] */ new RandomConstant("4a1000fb26c9eeda3cc79b1f78000001", + "2561426086d9acdb6c82e352fe000000"), + /* [ 26] */ new RandomConstant("89fb5307f6bf8ce2c1cf363ef0000001", + "64a788e3c118ed1c8215c6a5fc000000"), + /* [ 27] */ new RandomConstant("830b7b3358a5d67ea49e6c7de0000001", + "e65ea321908627cfa86b8d4bf8000000"), + /* [ 28] */ new RandomConstant("fd8a51da91a69fe1cd3cd8fbc0000001", + "53d27225604d85f9e1d71a97f0000000"), + /* [ 29] */ new RandomConstant("901a48b642b90b55aa79b1f780000001", + "ca5ec7a3ed1fe55e07ae352fe0000000"), + /* [ 30] */ new RandomConstant("118cdefdf32144f394f363ef00000001", + "4daebb2e085330651f5c6a5fc0000000"), + /* [ 31] */ new RandomConstant("0a88c0a91cff430829e6c7de00000001", + "9d6f1a00a8f3f76e7eb8d4bf80000000"), + /* [ 32] */ new RandomConstant("433bef4314f16a9453cd8fbc00000001", + "158c62f2b31e496dfd71a97f00000000"), + /* [ 33] */ new RandomConstant("c294b02995ae6738a79b1f7800000001", + "290e84a2eb15fd1ffae352fe00000000"), + /* [ 34] */ new RandomConstant("913575e0da8b16b14f363ef000000001", + "e3dc1bfbe991a34ff5c6a5fc00000000"), + /* [ 35] */ new RandomConstant("2f61b9f871cf4e629e6c7de000000001", + "ddf540d020b9eadfeb8d4bf800000000"), + /* [ 36] */ new RandomConstant("78d26ccbd68320c53cd8fbc000000001", + "8ee4950177ce66bfd71a97f000000000"), + /* [ 37] */ new RandomConstant("8b7ebd037898518a79b1f78000000001", + "39e0f787c907117fae352fe000000000"), + /* [ 38] */ new RandomConstant("0b5507b61f78e314f363ef0000000001", + "659d2522f7b732ff5c6a5fc000000000"), + /* [ 39] */ new RandomConstant("4f884628f812c629e6c7de0000000001", + "9e8722938612a5feb8d4bf8000000000"), + /* [ 40] */ new RandomConstant("be896744d4a98c53cd8fbc0000000001", + "e941a65d66b64bfd71a97f0000000000"), + /* [ 41] */ new RandomConstant("daf63a553b6318a79b1f780000000001", + "7b50d19437b097fae352fe0000000000"), + /* [ 42] */ new RandomConstant("2d7a23d8bf06314f363ef00000000001", + "59d7b68e18712ff5c6a5fc0000000000"), + /* [ 43] */ new RandomConstant("392b046a9f0c629e6c7de00000000001", + "4087bab2d5225feb8d4bf80000000000"), + /* [ 44] */ new RandomConstant("eb30fbb9c218c53cd8fbc00000000001", + "b470abc03b44bfd71a97f00000000000"), + /* [ 45] */ new RandomConstant("b9cdc30594318a79b1f7800000000001", + "366630eaba897fae352fe00000000000"), + /* [ 46] */ new RandomConstant("014ab453686314f363ef000000000001", + "a2dfc77e8512ff5c6a5fc00000000000"), + /* [ 47] */ new RandomConstant("395221c7d0c629e6c7de000000000001", + "1e0d25a14a25feb8d4bf800000000000"), + /* [ 48] */ new RandomConstant("4d972813a18c53cd8fbc000000000001", + "9d50a5d3944bfd71a97f000000000000"), + /* [ 49] */ new RandomConstant("06f9e2374318a79b1f78000000000001", + "bf7ab5eb2897fae352fe000000000000"), + /* [ 50] */ new RandomConstant("bd220cae86314f363ef0000000000001", + "925b14e6512ff5c6a5fc000000000000"), + /* [ 51] */ new RandomConstant("36fd3a5d0c629e6c7de0000000000001", + "724cce0ca25feb8d4bf8000000000000"), + /* [ 52] */ new RandomConstant("60def8ba18c53cd8fbc0000000000001", + "1af42d1944bfd71a97f0000000000000"), + /* [ 53] */ new RandomConstant("8d500174318a79b1f780000000000001", + "0f529e32897fae352fe0000000000000"), + /* [ 54] */ new RandomConstant("48e842e86314f363ef00000000000001", + "844e4c6512ff5c6a5fc0000000000000"), + /* [ 55] */ new RandomConstant("4af185d0c629e6c7de00000000000001", + "9f40d8ca25feb8d4bf80000000000000"), + /* [ 56] */ new RandomConstant("7a670ba18c53cd8fbc00000000000001", + "9912b1944bfd71a97f00000000000000"), + /* [ 57] */ new RandomConstant("86de174318a79b1f7800000000000001", + "9c69632897fae352fe00000000000000"), + /* [ 58] */ new RandomConstant("55fc2e86314f363ef000000000000001", + "e1e2c6512ff5c6a5fc00000000000000"), + /* [ 59] */ new RandomConstant("ccf85d0c629e6c7de000000000000001", + "68058ca25feb8d4bf800000000000000"), + /* [ 60] */ new RandomConstant("1df0ba18c53cd8fbc000000000000001", + "610b1944bfd71a97f000000000000000"), + /* [ 61] */ new RandomConstant("4be174318a79b1f78000000000000001", + "061632897fae352fe000000000000000"), + /* [ 62] */ new RandomConstant("d7c2e86314f363ef0000000000000001", + "1c2c6512ff5c6a5fc000000000000000"), + /* [ 63] */ new RandomConstant("af85d0c629e6c7de0000000000000001", + "7858ca25feb8d4bf8000000000000000"), + /* [ 64] */ new RandomConstant("5f0ba18c53cd8fbc0000000000000001", + "f0b1944bfd71a97f0000000000000000"), + /* [ 65] */ new RandomConstant("be174318a79b1f780000000000000001", + "e1632897fae352fe0000000000000000"), + /* [ 66] */ new RandomConstant("7c2e86314f363ef00000000000000001", + "c2c6512ff5c6a5fc0000000000000000"), + /* [ 67] */ new RandomConstant("f85d0c629e6c7de00000000000000001", + "858ca25feb8d4bf80000000000000000"), + /* [ 68] */ new RandomConstant("f0ba18c53cd8fbc00000000000000001", + "0b1944bfd71a97f00000000000000000"), + /* [ 69] */ new RandomConstant("e174318a79b1f7800000000000000001", + "1632897fae352fe00000000000000000"), + /* [ 70] */ new RandomConstant("c2e86314f363ef000000000000000001", + "2c6512ff5c6a5fc00000000000000000"), + /* [ 71] */ new RandomConstant("85d0c629e6c7de000000000000000001", + "58ca25feb8d4bf800000000000000000"), + /* [ 72] */ new RandomConstant("0ba18c53cd8fbc000000000000000001", + "b1944bfd71a97f000000000000000000"), + /* [ 73] */ new RandomConstant("174318a79b1f78000000000000000001", + "632897fae352fe000000000000000000"), + /* [ 74] */ new RandomConstant("2e86314f363ef0000000000000000001", + "c6512ff5c6a5fc000000000000000000"), + /* [ 75] */ new RandomConstant("5d0c629e6c7de0000000000000000001", + "8ca25feb8d4bf8000000000000000000"), + /* [ 76] */ new RandomConstant("ba18c53cd8fbc0000000000000000001", + "1944bfd71a97f0000000000000000000"), + /* [ 77] */ new RandomConstant("74318a79b1f780000000000000000001", + "32897fae352fe0000000000000000000"), + /* [ 78] */ new RandomConstant("e86314f363ef00000000000000000001", + "6512ff5c6a5fc0000000000000000000"), + /* [ 79] */ new RandomConstant("d0c629e6c7de00000000000000000001", + "ca25feb8d4bf80000000000000000000"), + /* [ 80] */ new RandomConstant("a18c53cd8fbc00000000000000000001", + "944bfd71a97f00000000000000000000"), + /* [ 81] */ new RandomConstant("4318a79b1f7800000000000000000001", + "2897fae352fe00000000000000000000"), + /* [ 82] */ new RandomConstant("86314f363ef000000000000000000001", + "512ff5c6a5fc00000000000000000000"), + /* [ 83] */ new RandomConstant("0c629e6c7de000000000000000000001", + "a25feb8d4bf800000000000000000000"), + /* [ 84] */ new RandomConstant("18c53cd8fbc000000000000000000001", + "44bfd71a97f000000000000000000000"), + /* [ 85] */ new RandomConstant("318a79b1f78000000000000000000001", + "897fae352fe000000000000000000000"), + /* [ 86] */ new RandomConstant("6314f363ef0000000000000000000001", + "12ff5c6a5fc000000000000000000000"), + /* [ 87] */ new RandomConstant("c629e6c7de0000000000000000000001", + "25feb8d4bf8000000000000000000000"), + /* [ 88] */ new RandomConstant("8c53cd8fbc0000000000000000000001", + "4bfd71a97f0000000000000000000000"), + /* [ 89] */ new RandomConstant("18a79b1f780000000000000000000001", + "97fae352fe0000000000000000000000"), + /* [ 90] */ new RandomConstant("314f363ef00000000000000000000001", + "2ff5c6a5fc0000000000000000000000"), + /* [ 91] */ new RandomConstant("629e6c7de00000000000000000000001", + "5feb8d4bf80000000000000000000000"), + /* [ 92] */ new RandomConstant("c53cd8fbc00000000000000000000001", + "bfd71a97f00000000000000000000000"), + /* [ 93] */ new RandomConstant("8a79b1f7800000000000000000000001", + "7fae352fe00000000000000000000000"), + /* [ 94] */ new RandomConstant("14f363ef000000000000000000000001", + "ff5c6a5fc00000000000000000000000"), + /* [ 95] */ new RandomConstant("29e6c7de000000000000000000000001", + "feb8d4bf800000000000000000000000"), + /* [ 96] */ new RandomConstant("53cd8fbc000000000000000000000001", + "fd71a97f000000000000000000000000"), + /* [ 97] */ new RandomConstant("a79b1f78000000000000000000000001", + "fae352fe000000000000000000000000"), + /* [ 98] */ new RandomConstant("4f363ef0000000000000000000000001", + "f5c6a5fc000000000000000000000000"), + /* [ 99] */ new RandomConstant("9e6c7de0000000000000000000000001", + "eb8d4bf8000000000000000000000000"), + /* [100] */ new RandomConstant("3cd8fbc0000000000000000000000001", + "d71a97f0000000000000000000000000"), + /* [101] */ new RandomConstant("79b1f780000000000000000000000001", + "ae352fe0000000000000000000000000"), + /* [102] */ new RandomConstant("f363ef00000000000000000000000001", + "5c6a5fc0000000000000000000000000"), + /* [103] */ new RandomConstant("e6c7de00000000000000000000000001", + "b8d4bf80000000000000000000000000"), + /* [104] */ new RandomConstant("cd8fbc00000000000000000000000001", + "71a97f00000000000000000000000000"), + /* [105] */ new RandomConstant("9b1f7800000000000000000000000001", + "e352fe00000000000000000000000000"), + /* [106] */ new RandomConstant("363ef000000000000000000000000001", + "c6a5fc00000000000000000000000000"), + /* [107] */ new RandomConstant("6c7de000000000000000000000000001", + "8d4bf800000000000000000000000000"), + /* [108] */ new RandomConstant("d8fbc000000000000000000000000001", + "1a97f000000000000000000000000000"), + /* [109] */ new RandomConstant("b1f78000000000000000000000000001", + "352fe000000000000000000000000000"), + /* [110] */ new RandomConstant("63ef0000000000000000000000000001", + "6a5fc000000000000000000000000000"), + /* [111] */ new RandomConstant("c7de0000000000000000000000000001", + "d4bf8000000000000000000000000000"), + /* [112] */ new RandomConstant("8fbc0000000000000000000000000001", + "a97f0000000000000000000000000000"), + /* [113] */ new RandomConstant("1f780000000000000000000000000001", + "52fe0000000000000000000000000000"), + /* [114] */ new RandomConstant("3ef00000000000000000000000000001", + "a5fc0000000000000000000000000000"), + /* [115] */ new RandomConstant("7de00000000000000000000000000001", + "4bf80000000000000000000000000000"), + /* [116] */ new RandomConstant("fbc00000000000000000000000000001", + "97f00000000000000000000000000000"), + /* [117] */ new RandomConstant("f7800000000000000000000000000001", + "2fe00000000000000000000000000000"), + /* [118] */ new RandomConstant("ef000000000000000000000000000001", + "5fc00000000000000000000000000000"), + /* [119] */ new RandomConstant("de000000000000000000000000000001", + "bf800000000000000000000000000000"), + /* [120] */ new RandomConstant("bc000000000000000000000000000001", + "7f000000000000000000000000000000"), + /* [121] */ new RandomConstant("78000000000000000000000000000001", + "fe000000000000000000000000000000"), + /* [122] */ new RandomConstant("f0000000000000000000000000000001", + "fc000000000000000000000000000000"), + /* [123] */ new RandomConstant("e0000000000000000000000000000001", + "f8000000000000000000000000000000"), + /* [124] */ new RandomConstant("c0000000000000000000000000000001", + "f0000000000000000000000000000000"), + /* [125] */ new RandomConstant("80000000000000000000000000000001", + "e0000000000000000000000000000000"), + /* [126] */ new RandomConstant("00000000000000000000000000000001", + "c0000000000000000000000000000000"), + /* [127] */ new RandomConstant("00000000000000000000000000000001", + "80000000000000000000000000000000")}; + + /** + * generate the random number that is "advance" steps + * from an initial random number of 0. This is done by + * starting with 0, and then advancing the by the + * appropriate powers of 2 of the linear congruential + * generator. + */ + public static Unsigned16 skipAhead(Unsigned16 advance) { + Unsigned16 result = new Unsigned16(); + long bit_map; + + bit_map = advance.getLow8(); + for (int i = 0; bit_map != 0 && i < 64; i++) { + if ((bit_map & (1L << i)) != 0) { + /* advance random number by f**(2**i) (x) + */ + result.multiply(genArray[i].a); + result.add(genArray[i].c); + bit_map &= ~(1L << i); + } + } + bit_map = advance.getHigh8(); + for (int i = 0; bit_map != 0 && i < 64; i++) + { + if ((bit_map & (1L << i)) != 0) { + /* advance random number by f**(2**(i + 64)) (x) + */ + result.multiply(genArray[i+64].a); + result.add(genArray[i+64].c); + bit_map &= ~(1L << i); + } + } + return result; + } + + /** + * Generate the next 16 byte random number. + */ + public static void nextRand(Unsigned16 rand) { + /* advance the random number forward once using the linear congruential + * generator, and then return the new random number + */ + rand.multiply(genArray[0].a); + rand.add(genArray[0].c); + } +} diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java new file mode 100644 index 0000000..ae3b99e --- /dev/null +++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/terasort/Unsigned16.java @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.apps.terasort; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * An unsigned 16 byte integer class that supports addition, multiplication, + * and left shifts. + * + * * code copied from <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/Unsigned16.java">Terasort Example</a> + */ +class Unsigned16 implements Writable { + private long hi8; + private long lo8; + + public Unsigned16() { + hi8 = 0; + lo8 = 0; + } + + public Unsigned16(long l) { + hi8 = 0; + lo8 = l; + } + + public Unsigned16(Unsigned16 other) { + hi8 = other.hi8; + lo8 = other.lo8; + } + + @Override + public boolean equals(Object o) { + if (o instanceof Unsigned16) { + Unsigned16 other = (Unsigned16) o; + return other.hi8 == hi8 && other.lo8 == lo8; + } + return false; + } + + @Override + public int hashCode() { + return (int) lo8; + } + + /** + * Parse a hex string + * @param s the hex string + */ + public Unsigned16(String s) throws NumberFormatException { + set(s); + } + + /** + * Set the number from a hex string + * @param s the number in hexadecimal + * @throws NumberFormatException if the number is invalid + */ + public void set(String s) throws NumberFormatException { + hi8 = 0; + lo8 = 0; + final long lastDigit = 0xfl << 60; + for (int i = 0; i < s.length(); ++i) { + int digit = getHexDigit(s.charAt(i)); + if ((lastDigit & hi8) != 0) { + throw new NumberFormatException(s + " overflowed 16 bytes"); + } + hi8 <<= 4; + hi8 |= (lo8 & lastDigit) >>> 60; + lo8 <<= 4; + lo8 |= digit; + } + } + + /** + * Set the number to a given long. + * @param l the new value, which is treated as an unsigned number + */ + public void set(long l) { + lo8 = l; + hi8 = 0; + } + + /** + * Map a hexadecimal character into a digit. + * @param ch the character + * @return the digit from 0 to 15 + * @throws NumberFormatException + */ + private static int getHexDigit(char ch) throws NumberFormatException { + if (ch >= '0' && ch <= '9') { + return ch - '0'; + } + if (ch >= 'a' && ch <= 'f') { + return ch - 'a' + 10; + } + if (ch >= 'A' && ch <= 'F') { + return ch - 'A' + 10; + } + throw new NumberFormatException(ch + " is not a valid hex digit"); + } + + private static final Unsigned16 TEN = new Unsigned16(10); + + public static Unsigned16 fromDecimal(String s) throws NumberFormatException { + Unsigned16 result = new Unsigned16(); + Unsigned16 tmp = new Unsigned16(); + for(int i=0; i < s.length(); i++) { + char ch = s.charAt(i); + if (ch < '0' || ch > '9') { + throw new NumberFormatException(ch + " not a valid decimal digit"); + } + int digit = ch - '0'; + result.multiply(TEN); + tmp.set(digit); + result.add(tmp); + } + return result; + } + + /** + * Return the number as a hex string. + */ + public String toString() { + if (hi8 == 0) { + return Long.toHexString(lo8); + } else { + StringBuilder result = new StringBuilder(); + result.append(Long.toHexString(hi8)); + String loString = Long.toHexString(lo8); + for(int i=loString.length(); i < 16; ++i) { + result.append('0'); + } + result.append(loString); + return result.toString(); + } + } + + /** + * Get a given byte from the number. + * @param b the byte to get with 0 meaning the most significant byte + * @return the byte or 0 if b is outside of 0..15 + */ + public byte getByte(int b) { + if (b >= 0 && b < 16) { + if (b < 8) { + return (byte) (hi8 >> (56 - 8*b)); + } else { + return (byte) (lo8 >> (120 - 8*b)); + } + } + return 0; + } + + /** + * Get the hexadecimal digit at the given position. + * @param p the digit position to get with 0 meaning the most significant + * @return the character or '0' if p is outside of 0..31 + */ + public char getHexDigit(int p) { + byte digit = getByte(p / 2); + if (p % 2 == 0) { + digit >>>= 4; + } + digit &= 0xf; + if (digit < 10) { + return (char) ('0' + digit); + } else { + return (char) ('A' + digit - 10); + } + } + + /** + * Get the high 8 bytes as a long. + */ + public long getHigh8() { + return hi8; + } + + /** + * Get the low 8 bytes as a long. + */ + public long getLow8() { + return lo8; + } + + /** + * Multiple the current number by a 16 byte unsigned integer. Overflow is not + * detected and the result is the low 16 bytes of the result. The numbers + * are divided into 32 and 31 bit chunks so that the product of two chucks + * fits in the unsigned 63 bits of a long. + * @param b the other number + */ + void multiply(Unsigned16 b) { + // divide the left into 4 32 bit chunks + long[] left = new long[4]; + left[0] = lo8 & 0xffffffffl; + left[1] = lo8 >>> 32; + left[2] = hi8 & 0xffffffffl; + left[3] = hi8 >>> 32; + // divide the right into 5 31 bit chunks + long[] right = new long[5]; + right[0] = b.lo8 & 0x7fffffffl; + right[1] = (b.lo8 >>> 31) & 0x7fffffffl; + right[2] = (b.lo8 >>> 62) + ((b.hi8 & 0x1fffffffl) << 2); + right[3] = (b.hi8 >>> 29) & 0x7fffffffl; + right[4] = (b.hi8 >>> 60); + // clear the cur value + set(0); + Unsigned16 tmp = new Unsigned16(); + for(int l=0; l < 4; ++l) { + for (int r=0; r < 5; ++r) { + long prod = left[l] * right[r]; + if (prod != 0) { + int off = l*32 + r*31; + tmp.set(prod); + tmp.shiftLeft(off); + add(tmp); + } + } + } + } + + /** + * Add the given number into the current number. + * @param b the other number + */ + public void add(Unsigned16 b) { + long sumHi; + long sumLo; + long reshibit, hibit0, hibit1; + + sumHi = hi8 + b.hi8; + + hibit0 = (lo8 & 0x8000000000000000L); + hibit1 = (b.lo8 & 0x8000000000000000L); + sumLo = lo8 + b.lo8; + reshibit = (sumLo & 0x8000000000000000L); + if ((hibit0 & hibit1) != 0 | ((hibit0 ^ hibit1) != 0 && reshibit == 0)) + sumHi++; /* add carry bit */ + hi8 = sumHi; + lo8 = sumLo; + } + + /** + * Shift the number a given number of bit positions. The number is the low + * order bits of the result. + * @param bits the bit positions to shift by + */ + public void shiftLeft(int bits) { + if (bits != 0) { + if (bits < 64) { + hi8 <<= bits; + hi8 |= (lo8 >>> (64 - bits)); + lo8 <<= bits; + } else if (bits < 128) { + hi8 = lo8 << (bits - 64); + lo8 = 0; + } else { + hi8 = 0; + lo8 = 0; + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + hi8 = in.readLong(); + lo8 = in.readLong(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(hi8); + out.writeLong(lo8); + } + + +} diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala new file mode 100644 index 0000000..f86a86e --- /dev/null +++ b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraApp.scala @@ -0,0 +1,138 @@ +package org.apache.wayang.apps.terasort + +import org.apache.wayang.apps.util.{ExperimentDescriptor, Parameters, ProfileDBHelper} +import org.apache.wayang.apps.wordcount.WordCountScala +import org.apache.wayang.core.api.Configuration +import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval +import org.apache.wayang.core.util.fs.FileSystems + +import java.util.regex.Pattern +import scala.util.matching.Regex + +object TeraApp extends ExperimentDescriptor { + + val KEY_LEN = 10 + val VALUE_LEN = 100 + val RECORD_LEN : Int = KEY_LEN + VALUE_LEN + + override def version = "0.1.0" + + def main(args: Array[String]) { + // Parse args. + if (args.isEmpty) { + println(s"Usage: " + + s"${Parameters.experimentHelp} " + + s"<plugin(,plugin)*> " + + s"<task could be[generate|sort|validate]> " + + s"<file size ([0-9]+(.[0-9]+)?)([B|k|K|m|M|g|G|t|T])> " + + s"<partitions>" + + s"<input file if not value is null> " + + s"<output file if not value is null>") + sys.exit(1) + } + implicit val configuration = new Configuration + implicit val experiment = Parameters.createExperiment(args(0), this) + val plugins = Parameters.loadPlugins(args(1)) + experiment.getSubject.addConfiguration("plugins", args(1)) + val task = args(2) + experiment.getSubject.addConfiguration("task", task) + val fileSize = sizeStrToBytes(args(3)) + experiment.getSubject.addConfiguration("fileSize", fileSize) + val partitions = args(4).toInt + experiment.getSubject.addConfiguration("partitions", partitions) + val input_file = if(args(5).equals("null")) null else args(5) + val output_file = if(args.length >= 5){ if(args(6).equals("null")) null else args(6) } else null + experiment.getSubject.addConfiguration("inputFile", input_file) + experiment.getSubject.addConfiguration("outputFile", output_file) + + task match { + case "generate" => new TeraGen(plugins: _*).apply(output_file, fileSize, partitions) + case "sort" => null + case "validate" => null + } + + + // Run wordCount. +// val wordCount = new WordCountScala(plugins: _*) +// val words = +// (if (wordsPerLine != null) { +// wordCount(inputFile, wordsPerLine) +// } else { +// wordCount(inputFile) +// }).toSeq.sortBy(-_._2) +// +// // Store experiment data. +// val inputFileSize = FileSystems.getFileSize(inputFile) +// if (inputFileSize.isPresent) experiment.getSubject.addConfiguration("inputSize", inputFileSize.getAsLong) +// ProfileDBHelper.store(experiment, configuration) +// +// // Print results. +// println(s"Found ${words.size} words:") +// words.take(10).foreach(wc => println(s"${wc._2}x ${wc._1}")) +// if (words.size > 10) print(s"${words.size - 10} more...") + + + } + + /** + * Convert the string format ([0-9]+(.[0-9]+)?)([B|k|K|m|M|g|G|t|T]) to the + * number on bytes + * + * B = Bytes + * k|K = Kilobytes (1_024 Bytes) + * m|M = Megabytes (1_048_576 Bytes) + * g|G = Gigabytes (1_073_741_824 Bytes) + * t|T = Terabytes (1_099_511_627_776 Bytes) + * + * @param str in the format + * @return number equivalent to the byte + */ + def sizeStrToBytes(str: String): Long = { + val reg = "(\\d+(\\.\\d+)?)([B|k|K|m|M|g|G|t|T])" + val groups = Pattern.compile(reg).matcher(str) + groups.find() + + val number_part:Double = groups.group(1).toDouble + val letter_part:String = groups.group(3) + + val conversion = letter_part match { + case "B" => 1L //2^0 + case "k" => 1024L //2^10 + case "K" => 1024L //2^10 + case "m" => 1048576L //2^20 + case "M" => 1048576L //2^20 + case "g" => 1073741824L //2^30 + case "G" => 1073741824L //2^30 + case "t" => 1099511627776L //2^40 + case "T" => 1099511627776L //2^40 + case _ => 1L //2^0 + } + (number_part * conversion).toLong + } + + /** + * take a number that represent a size on bytes return the human readable version + * + * @param size number that represent the size + * @return human readable version of the size + */ + def sizeToSizeStr(size: Long): String = { + val kbScale: Long = 1024L + val mbScale: Long = 1024L * kbScale + val gbScale: Long = 1024L * mbScale + val tbScale: Long = 1024L * gbScale + + if (size > tbScale) { + size / tbScale + "TB" + } else if (size > gbScale) { + size / gbScale + "GB" + } else if (size > mbScale) { + size / mbScale + "MB" + } else if (size > kbScale) { + size / kbScale + "KB" + } else { + size + "B" + } + } + +} diff --git a/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraGen.scala b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraGen.scala new file mode 100644 index 0000000..89ac49d --- /dev/null +++ b/wayang-benchmark/src/main/scala/org/apache/wayang/apps/terasort/TeraGen.scala @@ -0,0 +1,111 @@ +package org.apache.wayang.apps.terasort + +import org.apache.wayang.api.PlanBuilder +import org.apache.wayang.commons.util.profiledb.model.Experiment +import org.apache.wayang.core.api.{Configuration, WayangContext} +import org.apache.wayang.core.plugin.Plugin + +class TeraGen(@transient plugins: Plugin*) extends Serializable { + + def apply(output_url: String, file_size: Long, partitions: Int) + (implicit configuration: Configuration, experiment: Experiment) = { + val wayangCtx = new WayangContext(configuration) + plugins.foreach(wayangCtx.register) + val planBuilder = new PlanBuilder(wayangCtx) + + val parts = partitions + val recordsPerPartition = file_size / TeraApp.VALUE_LEN / parts.toLong + val numRecords = recordsPerPartition * parts.toLong + + assert(recordsPerPartition < Int.MaxValue, s"records per partition > ${Int.MaxValue}") + + println("===========================================================================") + println("===========================================================================") + println(s"Input size: $file_size") + println(s"Total number of records: $numRecords") + println(s"Number of output partitions: $parts") + println("Number of records/output partition: " + (numRecords / parts)) + println("===========================================================================") + println("===========================================================================") + + planBuilder + .withJobName(s"Terasort generate ${file_size}") + .withExperiment(experiment) + .withUdfJarsOf(this.getClass) + .loadCollection(1 to parts) + .flatMap( index => { + val one = new Unsigned16(1) + val firstRecordNumber = new Unsigned16(index.toLong * recordsPerPartition.toLong) + val recordsToGenerate = new Unsigned16(recordsPerPartition) + + val recordNumber = new Unsigned16(firstRecordNumber) + val lastRecordNumber = new Unsigned16(firstRecordNumber) + lastRecordNumber.add(recordsToGenerate) + + val rand = Random16.skipAhead(firstRecordNumber) + + Iterator.tabulate(recordsPerPartition.toInt) { offset => + val rowBytes: Array[Byte] = new Array[Byte](TeraApp.RECORD_LEN) + val key = new Array[Byte](TeraApp.KEY_LEN) + val value = new Array[Byte](TeraApp.VALUE_LEN) + Random16.nextRand(rand) + generateRecord(rowBytes, rand, recordNumber) + recordNumber.add(one) + rowBytes.copyToArray(key, 0, TeraApp.KEY_LEN) + rowBytes.takeRight(TeraApp.VALUE_LEN).copyToArray(value, 0, TeraApp.VALUE_LEN) + (key, value) + }.toStream + }) + .writeObjectFile(output_url) + } + + /** + * Generate a binary record suitable for all sort benchmarks except PennySort. + * + * @param recBuf record to return + */ + def generateRecord(recBuf: Array[Byte], rand: Unsigned16, recordNumber: Unsigned16): Unit = { + // Generate the 10-byte key using the high 10 bytes of the 128-bit random number + var i = 0 + while (i < 10) { + recBuf(i) = rand.getByte(i) + i += 1 + } + + // Add 2 bytes of "break" + recBuf(10) = 0x00.toByte + recBuf(11) = 0x11.toByte + + // Convert the 128-bit record number to 32 bits of ascii hexadecimal + // as the next 32 bytes of the record. + i = 0 + while (i < 32) { + recBuf(12 + i) = recordNumber.getHexDigit(i).toByte + i += 1 + } + + // Add 4 bytes of "break" data + recBuf(44) = 0x88.toByte + recBuf(45) = 0x99.toByte + recBuf(46) = 0xAA.toByte + recBuf(47) = 0xBB.toByte + + // Add 48 bytes of filler based on low 48 bits of random number + i = 0 + while (i < 12) { + val v = rand.getHexDigit(20 + i).toByte + recBuf(48 + i * 4) = v + recBuf(49 + i * 4) = v + recBuf(50 + i * 4) = v + recBuf(51 + i * 4) = v + i += 1 + } + + // Add 4 bytes of "break" data + recBuf(96) = 0xCC.toByte + recBuf(97) = 0xDD.toByte + recBuf(98) = 0xEE.toByte + recBuf(99) = 0xFF.toByte + } + +}
