Repository: incubator-systemml Updated Branches: refs/heads/master 20b1399f1 -> 354ec0217
[SYSTEMML-240] Add instrumentation for spark collect, broadcast & parallelize Closes #80. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/354ec021 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/354ec021 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/354ec021 Branch: refs/heads/master Commit: 354ec0217b60df2104b52663c9cc059e022b9110 Parents: 20b1399 Author: Nakul Jindal <[email protected]> Authored: Mon Mar 7 11:36:41 2016 -0800 Committer: Deron Eriksson <[email protected]> Committed: Mon Mar 7 11:36:41 2016 -0800 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 53 +++++++++++++++++--- .../java/org/apache/sysml/utils/Statistics.java | 44 ++++++++++++++++ 2 files changed, 90 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/354ec021/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 45cb948..4f9c918 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -353,7 +353,9 @@ public class SparkExecutionContext extends ExecutionContext @SuppressWarnings("unchecked") public PartitionedBroadcastMatrix getBroadcastForVariable( String varname ) throws DMLRuntimeException, DMLUnsupportedOperationException - { + { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + MatrixObject mo = getMatrixObject(varname); PartitionedBroadcastMatrix bret = null; @@ -400,6 +402,11 @@ public class SparkExecutionContext extends ExecutionContext mo.setBroadcastHandle(bchandle); } + if (DMLScript.STATISTICS) { + Statistics.accSparkBroadCastTime(System.nanoTime() - t0); + Statistics.incSparkBroadcastCount(1); + } + return bret; } @@ -451,7 +458,8 @@ public class SparkExecutionContext extends ExecutionContext */ public static JavaPairRDD<MatrixIndexes,MatrixBlock> toJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen) throws DMLRuntimeException, DMLUnsupportedOperationException - { + { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; LinkedList<Tuple2<MatrixIndexes,MatrixBlock>> list = new LinkedList<Tuple2<MatrixIndexes,MatrixBlock>>(); if( src.getNumRows() <= brlen @@ -485,7 +493,13 @@ public class SparkExecutionContext extends ExecutionContext } } - return sc.parallelizePairs(list); + JavaPairRDD<MatrixIndexes,MatrixBlock> result = sc.parallelizePairs(list); + if (DMLScript.STATISTICS) { + Statistics.accSparkParallelizeTime(System.nanoTime() - t0); + Statistics.incSparkParallelizeCount(1); + } + + return result; } /** @@ -523,12 +537,16 @@ public class SparkExecutionContext extends ExecutionContext public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) throws DMLRuntimeException { + + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + MatrixBlock out = null; if( rlen <= brlen && clen <= bclen ) //SINGLE BLOCK { //special case without copy and nnz maintenance List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect(); + if( list.size()>1 ) throw new DMLRuntimeException("Expecting no more than one result block."); else if( list.size()==1 ) @@ -541,9 +559,10 @@ public class SparkExecutionContext extends ExecutionContext //determine target sparse/dense representation long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen; boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz); - + //create output matrix block (w/ lazy allocation) out = new MatrixBlock(rlen, clen, sparse); + List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect(); //copy blocks one-at-a-time into output matrix block @@ -577,6 +596,11 @@ public class SparkExecutionContext extends ExecutionContext out.examSparsity(); } + if (DMLScript.STATISTICS) { + Statistics.accSparkCollectTime(System.nanoTime() - t0); + Statistics.incSparkCollectCount(1); + } + return out; } @@ -602,15 +626,18 @@ public class SparkExecutionContext extends ExecutionContext */ public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixCell> rdd, int rlen, int clen, long nnz) throws DMLRuntimeException - { + { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + MatrixBlock out = null; //determine target sparse/dense representation long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen; boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz); - + //create output matrix block (w/ lazy allocation) out = new MatrixBlock(rlen, clen, sparse); + List<Tuple2<MatrixIndexes,MatrixCell>> list = rdd.collect(); //copy blocks one-at-a-time into output matrix block @@ -631,6 +658,11 @@ public class SparkExecutionContext extends ExecutionContext out.recomputeNonZeros(); out.examSparsity(); + if (DMLScript.STATISTICS) { + Statistics.accSparkCollectTime(System.nanoTime() - t0); + Statistics.incSparkCollectCount(1); + } + return out; } @@ -648,8 +680,10 @@ public class SparkExecutionContext extends ExecutionContext public static PartitionedMatrixBlock toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) throws DMLRuntimeException { - PartitionedMatrixBlock out = new PartitionedMatrixBlock(rlen, clen, brlen, bclen); + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + + PartitionedMatrixBlock out = new PartitionedMatrixBlock(rlen, clen, brlen, bclen); List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect(); //copy blocks one-at-a-time into output matrix block @@ -660,6 +694,11 @@ public class SparkExecutionContext extends ExecutionContext MatrixBlock block = keyval._2(); out.setMatrixBlock((int)ix.getRowIndex(), (int)ix.getColumnIndex(), block); } + + if (DMLScript.STATISTICS) { + Statistics.accSparkCollectTime(System.nanoTime() - t0); + Statistics.incSparkCollectCount(1); + } return out; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/354ec021/src/main/java/org/apache/sysml/utils/Statistics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/utils/Statistics.java b/src/main/java/org/apache/sysml/utils/Statistics.java index 167f6db..ce133ff 100644 --- a/src/main/java/org/apache/sysml/utils/Statistics.java +++ b/src/main/java/org/apache/sysml/utils/Statistics.java @@ -77,6 +77,15 @@ public class Statistics //Spark-specific stats private static long sparkCtxCreateTime = 0; + private static AtomicLong sparkParallelize = new AtomicLong(0L); + private static AtomicLong sparkParallelizeCount = new AtomicLong(0L); + private static AtomicLong sparkCollect = new AtomicLong(0L); + private static AtomicLong sparkCollectCount = new AtomicLong(0L); + private static AtomicLong sparkBroadcast = new AtomicLong(0L); + private static AtomicLong sparkBroadcastCount = new AtomicLong(0L); + + + //PARFOR optimization stats private static long parforOptTime = 0; //in milli sec @@ -92,6 +101,8 @@ public class Statistics private static AtomicLong lTotalLix = new AtomicLong(0); private static AtomicLong lTotalLixUIP = new AtomicLong(0); + + public static synchronized void setNoOfExecutedMRJobs(int iNoOfExecutedMRJobs) { Statistics.iNoOfExecutedMRJobs = iNoOfExecutedMRJobs; } @@ -346,6 +357,31 @@ public class Statistics sparkCtxCreateTime = ns; } + public static void accSparkParallelizeTime(long t) { + sparkParallelize.addAndGet(t); + } + + public static void incSparkParallelizeCount(long c) { + sparkParallelizeCount.addAndGet(c); + } + + public static void accSparkCollectTime(long t) { + sparkCollect.addAndGet(t); + } + + public static void incSparkCollectCount(long c) { + sparkCollectCount.addAndGet(c); + } + + public static void accSparkBroadCastTime(long t) { + sparkBroadcast.addAndGet(t); + } + + public static void incSparkBroadcastCount(long c) { + sparkBroadcastCount.addAndGet(c); + } + + public static String getCPHeavyHitterCode( Instruction inst ) { String opcode = null; @@ -549,6 +585,14 @@ public class Statistics String lazy = SparkExecutionContext.isLazySparkContextCreation() ? "(lazy)" : "(eager)"; sb.append("Spark ctx create time "+lazy+":\t"+ String.format("%.3f", ((double)sparkCtxCreateTime)*1e-9) + " sec.\n" ); // nanoSec --> sec + + sb.append("Spark trans counts (par,bc,col):" + + String.format("%d/%d/%d.\n", sparkParallelizeCount.get(), sparkBroadcastCount.get(), sparkCollectCount.get())); + sb.append("Spark trans times (par,bc,col):\t" + + String.format("%.3f/%.3f/%.3f secs.\n", + ((double)sparkParallelize.get())*1e-9, + ((double)sparkBroadcast.get())*1e-9, + ((double)sparkCollect.get())*1e-9)); } if( parforOptCount>0 ){ sb.append("ParFor loops optimized:\t\t" + getParforOptCount() + ".\n");
