Repository: incubator-systemml
Updated Branches:
  refs/heads/master 20b1399f1 -> 354ec0217


[SYSTEMML-240] Add instrumentation for spark collect, broadcast & parallelize

Closes #80.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/354ec021
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/354ec021
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/354ec021

Branch: refs/heads/master
Commit: 354ec0217b60df2104b52663c9cc059e022b9110
Parents: 20b1399
Author: Nakul Jindal <[email protected]>
Authored: Mon Mar 7 11:36:41 2016 -0800
Committer: Deron Eriksson <[email protected]>
Committed: Mon Mar 7 11:36:41 2016 -0800

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          | 53 +++++++++++++++++---
 .../java/org/apache/sysml/utils/Statistics.java | 44 ++++++++++++++++
 2 files changed, 90 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/354ec021/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 45cb948..4f9c918 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -353,7 +353,9 @@ public class SparkExecutionContext extends ExecutionContext
        @SuppressWarnings("unchecked")
        public PartitionedBroadcastMatrix getBroadcastForVariable( String 
varname ) 
                throws DMLRuntimeException, DMLUnsupportedOperationException
-       {
+       {               
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+
                MatrixObject mo = getMatrixObject(varname);
                
                PartitionedBroadcastMatrix bret = null;
@@ -400,6 +402,11 @@ public class SparkExecutionContext extends ExecutionContext
                        mo.setBroadcastHandle(bchandle);
                }
                
+               if (DMLScript.STATISTICS) {
+                       Statistics.accSparkBroadCastTime(System.nanoTime() - 
t0);
+                       Statistics.incSparkBroadcastCount(1);
+               }
+               
                return bret;
        }
        
@@ -451,7 +458,8 @@ public class SparkExecutionContext extends ExecutionContext
         */
        public static JavaPairRDD<MatrixIndexes,MatrixBlock> 
toJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen) 
                throws DMLRuntimeException, DMLUnsupportedOperationException
-       {
+       {       
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                LinkedList<Tuple2<MatrixIndexes,MatrixBlock>> list = new 
LinkedList<Tuple2<MatrixIndexes,MatrixBlock>>();
                
                if(    src.getNumRows() <= brlen 
@@ -485,7 +493,13 @@ public class SparkExecutionContext extends ExecutionContext
                                }
                }
                
-               return sc.parallelizePairs(list);
+               JavaPairRDD<MatrixIndexes,MatrixBlock> result = 
sc.parallelizePairs(list);
+               if (DMLScript.STATISTICS) {
+                       Statistics.accSparkParallelizeTime(System.nanoTime() - 
t0);
+                       Statistics.incSparkParallelizeCount(1);
+               }
+               
+               return result;
        }
        
        /**
@@ -523,12 +537,16 @@ public class SparkExecutionContext extends 
ExecutionContext
        public static MatrixBlock 
toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, 
int brlen, int bclen, long nnz) 
                throws DMLRuntimeException
        {
+               
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+
                MatrixBlock out = null;
                
                if( rlen <= brlen && clen <= bclen ) //SINGLE BLOCK
                {
                        //special case without copy and nnz maintenance
                        List<Tuple2<MatrixIndexes,MatrixBlock>> list = 
rdd.collect();
+                       
                        if( list.size()>1 )
                                throw new DMLRuntimeException("Expecting no 
more than one result block.");
                        else if( list.size()==1 )
@@ -541,9 +559,10 @@ public class SparkExecutionContext extends ExecutionContext
                        //determine target sparse/dense representation
                        long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen;
                        boolean sparse = 
MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz);
-                       
+                                               
                        //create output matrix block (w/ lazy allocation)
                        out = new MatrixBlock(rlen, clen, sparse);
+                       
                        List<Tuple2<MatrixIndexes,MatrixBlock>> list = 
rdd.collect();
                        
                        //copy blocks one-at-a-time into output matrix block
@@ -577,6 +596,11 @@ public class SparkExecutionContext extends ExecutionContext
                        out.examSparsity();
                }
                
+               if (DMLScript.STATISTICS) {
+                       Statistics.accSparkCollectTime(System.nanoTime() - t0);
+                       Statistics.incSparkCollectCount(1);
+               }
+               
                return out;
        }
        
@@ -602,15 +626,18 @@ public class SparkExecutionContext extends 
ExecutionContext
         */
        public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, 
MatrixCell> rdd, int rlen, int clen, long nnz) 
                throws DMLRuntimeException
-       {
+       {       
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+
                MatrixBlock out = null;
                
                //determine target sparse/dense representation
                long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen;
                boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, 
clen, lnnz);
-               
+                               
                //create output matrix block (w/ lazy allocation)
                out = new MatrixBlock(rlen, clen, sparse);
+               
                List<Tuple2<MatrixIndexes,MatrixCell>> list = rdd.collect();
                
                //copy blocks one-at-a-time into output matrix block
@@ -631,6 +658,11 @@ public class SparkExecutionContext extends ExecutionContext
                out.recomputeNonZeros();
                out.examSparsity();
                
+               if (DMLScript.STATISTICS) {
+                       Statistics.accSparkCollectTime(System.nanoTime() - t0);
+                       Statistics.incSparkCollectCount(1);
+               }
+               
                return out;
        }
        
@@ -648,8 +680,10 @@ public class SparkExecutionContext extends ExecutionContext
        public static PartitionedMatrixBlock 
toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, 
int clen, int brlen, int bclen, long nnz) 
                throws DMLRuntimeException
        {
-               PartitionedMatrixBlock out = new PartitionedMatrixBlock(rlen, 
clen, brlen, bclen);
                
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+
+               PartitionedMatrixBlock out = new PartitionedMatrixBlock(rlen, 
clen, brlen, bclen);
                List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
                
                //copy blocks one-at-a-time into output matrix block
@@ -660,6 +694,11 @@ public class SparkExecutionContext extends ExecutionContext
                        MatrixBlock block = keyval._2();
                        out.setMatrixBlock((int)ix.getRowIndex(), 
(int)ix.getColumnIndex(), block);
                }
+               
+               if (DMLScript.STATISTICS) {
+                       Statistics.accSparkCollectTime(System.nanoTime() - t0);
+                       Statistics.incSparkCollectCount(1);
+               }
                                
                return out;
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/354ec021/src/main/java/org/apache/sysml/utils/Statistics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/utils/Statistics.java 
b/src/main/java/org/apache/sysml/utils/Statistics.java
index 167f6db..ce133ff 100644
--- a/src/main/java/org/apache/sysml/utils/Statistics.java
+++ b/src/main/java/org/apache/sysml/utils/Statistics.java
@@ -77,6 +77,15 @@ public class Statistics
        
        //Spark-specific stats
        private static long sparkCtxCreateTime = 0; 
+       private static AtomicLong sparkParallelize = new AtomicLong(0L);
+       private static AtomicLong sparkParallelizeCount = new AtomicLong(0L);
+       private static AtomicLong sparkCollect = new AtomicLong(0L);
+       private static AtomicLong sparkCollectCount = new AtomicLong(0L);
+       private static AtomicLong sparkBroadcast = new AtomicLong(0L);
+       private static AtomicLong sparkBroadcastCount = new AtomicLong(0L);
+       
+
+       
 
        //PARFOR optimization stats 
        private static long parforOptTime = 0; //in milli sec
@@ -92,6 +101,8 @@ public class Statistics
        private static AtomicLong lTotalLix = new AtomicLong(0);
        private static AtomicLong lTotalLixUIP = new AtomicLong(0);
        
+       
+       
        public static synchronized void setNoOfExecutedMRJobs(int 
iNoOfExecutedMRJobs) {
                Statistics.iNoOfExecutedMRJobs = iNoOfExecutedMRJobs;
        }
@@ -346,6 +357,31 @@ public class Statistics
                sparkCtxCreateTime = ns;
        }
        
+       public static void accSparkParallelizeTime(long t) {
+               sparkParallelize.addAndGet(t);
+       }
+
+       public static void incSparkParallelizeCount(long c) {
+               sparkParallelizeCount.addAndGet(c);
+       }
+
+       public static void accSparkCollectTime(long t) {
+               sparkCollect.addAndGet(t);
+       }
+
+       public static void incSparkCollectCount(long c) {
+               sparkCollectCount.addAndGet(c);
+       }
+
+       public static void accSparkBroadCastTime(long t) {
+               sparkBroadcast.addAndGet(t);
+       }
+
+       public static void incSparkBroadcastCount(long c) {
+               sparkBroadcastCount.addAndGet(c);
+       }
+       
+       
        public static String getCPHeavyHitterCode( Instruction inst )
        {
                String opcode = null;
@@ -549,6 +585,14 @@ public class Statistics
                                String lazy = 
SparkExecutionContext.isLazySparkContextCreation() ? "(lazy)" : "(eager)";
                                sb.append("Spark ctx create time "+lazy+":\t"+
                                                String.format("%.3f", 
((double)sparkCtxCreateTime)*1e-9)  + " sec.\n" ); // nanoSec --> sec
+                               
+                               sb.append("Spark trans counts (par,bc,col):" +
+                                               String.format("%d/%d/%d.\n", 
sparkParallelizeCount.get(), sparkBroadcastCount.get(), 
sparkCollectCount.get()));
+                               sb.append("Spark trans times (par,bc,col):\t" +
+                                               String.format("%.3f/%.3f/%.3f 
secs.\n", 
+                                                                
((double)sparkParallelize.get())*1e-9,
+                                                                
((double)sparkBroadcast.get())*1e-9,
+                                                                
((double)sparkCollect.get())*1e-9));
                        }
                        if( parforOptCount>0 ){
                                sb.append("ParFor loops optimized:\t\t" + 
getParforOptCount() + ".\n");

Reply via email to