Updated Branches: refs/heads/branch-0.8 daaaee175 -> 31da065b1
Merge pull request #95 from aarondav/perftest Minor: Put StoragePerfTester in org/apache/ (cherry picked from commit a51359c917a9ebe379b32ebc53fd093c454ea195) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/31da065b Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/31da065b Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/31da065b Branch: refs/heads/branch-0.8 Commit: 31da065b1d08c1fad5283e4bcf8e0ed01818c03e Parents: daaaee1 Author: Reynold Xin <[email protected]> Authored: Mon Oct 21 20:33:29 2013 -0700 Committer: Reynold Xin <[email protected]> Committed: Wed Dec 4 14:01:13 2013 -0800 ---------------------------------------------------------------------- .../spark/storage/StoragePerfTester.scala | 84 ++++++++++++++++++++ .../scala/spark/storage/StoragePerfTester.scala | 84 -------------------- 2 files changed, 84 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/31da065b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala new file mode 100644 index 0000000..68893a2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala @@ -0,0 +1,84 @@ +package org.apache.spark.storage + +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{CountDownLatch, Executors} + +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.SparkContext +import org.apache.spark.util.Utils + +/** Utility for micro-benchmarking shuffle write performance. + * + * Writes simulated shuffle output from several threads and records the observed throughput*/ +object StoragePerfTester { + def main(args: Array[String]) = { + /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */ + val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g")) + + /** Number of map tasks. All tasks execute concurrently. */ + val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8) + + /** Number of reduce splits for each map task. */ + val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500) + + val recordLength = 1000 // ~1KB records + val totalRecords = dataSizeMb * 1000 + val recordsPerMap = totalRecords / numMaps + + val writeData = "1" * recordLength + val executor = Executors.newFixedThreadPool(numMaps) + + System.setProperty("spark.shuffle.compress", "false") + System.setProperty("spark.shuffle.sync", "true") + + // This is only used to instantiate a BlockManager. All thread scheduling is done manually. + val sc = new SparkContext("local[4]", "Write Tester") + val blockManager = sc.env.blockManager + + def writeOutputBytes(mapId: Int, total: AtomicLong) = { + val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, + new KryoSerializer()) + val writers = shuffle.writers + for (i <- 1 to recordsPerMap) { + writers(i % numOutputSplits).write(writeData) + } + writers.map {w => + w.commit() + total.addAndGet(w.fileSegment().length) + w.close() + } + + shuffle.releaseWriters(true) + } + + val start = System.currentTimeMillis() + val latch = new CountDownLatch(numMaps) + val totalBytes = new AtomicLong() + for (task <- 1 to numMaps) { + executor.submit(new Runnable() { + override def run() = { + try { + writeOutputBytes(task, totalBytes) + latch.countDown() + } catch { + case e: Exception => + println("Exception in child thread: " + e + " " + e.getMessage) + System.exit(1) + } + } + }) + } + latch.await() + val end = System.currentTimeMillis() + val time = (end - start) / 1000.0 + val bytesPerSecond = totalBytes.get() / time + val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong + + System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits)) + System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile))) + System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong))) + + executor.shutdown() + sc.stop() + } +} http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/31da065b/core/src/main/scala/spark/storage/StoragePerfTester.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/spark/storage/StoragePerfTester.scala b/core/src/main/scala/spark/storage/StoragePerfTester.scala deleted file mode 100644 index 68893a2..0000000 --- a/core/src/main/scala/spark/storage/StoragePerfTester.scala +++ /dev/null @@ -1,84 +0,0 @@ -package org.apache.spark.storage - -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.{CountDownLatch, Executors} - -import org.apache.spark.serializer.KryoSerializer -import org.apache.spark.SparkContext -import org.apache.spark.util.Utils - -/** Utility for micro-benchmarking shuffle write performance. - * - * Writes simulated shuffle output from several threads and records the observed throughput*/ -object StoragePerfTester { - def main(args: Array[String]) = { - /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */ - val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g")) - - /** Number of map tasks. All tasks execute concurrently. */ - val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8) - - /** Number of reduce splits for each map task. */ - val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500) - - val recordLength = 1000 // ~1KB records - val totalRecords = dataSizeMb * 1000 - val recordsPerMap = totalRecords / numMaps - - val writeData = "1" * recordLength - val executor = Executors.newFixedThreadPool(numMaps) - - System.setProperty("spark.shuffle.compress", "false") - System.setProperty("spark.shuffle.sync", "true") - - // This is only used to instantiate a BlockManager. All thread scheduling is done manually. - val sc = new SparkContext("local[4]", "Write Tester") - val blockManager = sc.env.blockManager - - def writeOutputBytes(mapId: Int, total: AtomicLong) = { - val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, - new KryoSerializer()) - val writers = shuffle.writers - for (i <- 1 to recordsPerMap) { - writers(i % numOutputSplits).write(writeData) - } - writers.map {w => - w.commit() - total.addAndGet(w.fileSegment().length) - w.close() - } - - shuffle.releaseWriters(true) - } - - val start = System.currentTimeMillis() - val latch = new CountDownLatch(numMaps) - val totalBytes = new AtomicLong() - for (task <- 1 to numMaps) { - executor.submit(new Runnable() { - override def run() = { - try { - writeOutputBytes(task, totalBytes) - latch.countDown() - } catch { - case e: Exception => - println("Exception in child thread: " + e + " " + e.getMessage) - System.exit(1) - } - } - }) - } - latch.await() - val end = System.currentTimeMillis() - val time = (end - start) / 1000.0 - val bytesPerSecond = totalBytes.get() / time - val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong - - System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits)) - System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile))) - System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong))) - - executor.shutdown() - sc.stop() - } -}
