[SYSTEMML-925] Performance binary-to-csv frame conversion (w/o sort) 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/69a78581
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/69a78581
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/69a78581

Branch: refs/heads/master
Commit: 69a78581e8cc4bcc6dec5ecc88d9dad6aec96297
Parents: 0e8f1e1
Author: Matthias Boehm <mbo...@us.ibm.com>
Authored: Sat Sep 17 01:57:32 2016 +0200
Committer: Matthias Boehm <mbo...@us.ibm.com>
Committed: Sat Sep 17 00:25:19 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/FrameRDDConverterUtils.java     | 57 ++++++++++++++++++--
 1 file changed, 53 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/69a78581/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index faf8ba1..b541242 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -156,7 +156,7 @@ public class FrameRDDConverterUtils
                JavaPairRDD<Long,FrameBlock> input = in;
                
                //sort if required (on blocks/rows)
-               if( strict ) {
+               if( strict && !isSorted(input) ) {
                        input = input.sortByKey(true);
                }
                
@@ -454,7 +454,7 @@ public class FrameRDDConverterUtils
                JavaRDD<String> dataRdd = sc.textFile(fnameIn);
                return dataRdd.map(new RowGenerator(schema, delim));
        }
-       
+
        /* 
         * Row Generator class based on individual line in CSV file.
         */
@@ -480,8 +480,50 @@ public class FrameRDDConverterUtils
                      return RowFactory.create(objects);
                }
        }
-       
 
+       /**
+        * Check if the rdd is already sorted in order to avoid unnecessary
+        * sampling, shuffle, and sort per partition.
+        * 
+        * @param in
+        * @return
+        */
+       private static boolean isSorted(JavaPairRDD<Long, FrameBlock> in) {     
        
+               //check sorted partitions (returns max key if true; -1 
otherwise)
+               List<Long> keys = in.keys().mapPartitions(
+                               new SortingAnalysisFunction()).collect();
+               long max = 0;
+               for( Long val : keys ) {
+                       if( val < max )
+                               return false;
+                       max = val;
+               }
+               return true;
+       }
+
+       /**
+        * 
+        */
+       private static class SortingAnalysisFunction implements 
FlatMapFunction<Iterator<Long>,Long> 
+       {
+               private static final long serialVersionUID = 
-5789003262381127469L;
+
+               @Override
+               public Iterable<Long> call(Iterator<Long> arg0) throws 
Exception 
+               {
+                       long max = 0;
+                       while( max >= 0 && arg0.hasNext() ) {
+                               long val = arg0.next();
+                               max = (val < max) ? -1 : val;
+                       }                       
+                       
+                       ArrayList<Long> ret = new ArrayList<Long>();    
+                       ret.add(max);
+                       return ret;
+               }
+       }
+       
+       
        /////////////////////////////////
        // CSV-SPECIFIC FUNCTIONS
        
@@ -1087,7 +1129,14 @@ public class FrameRDDConverterUtils
        //////////////////////////////////////
        // Common functions
        
-       // Flushes current state of filled column blocks to output list.
+       /**
+        * Flushes current state of filled column blocks to output list.
+        * 
+        * @param ix
+        * @param fb
+        * @param ret
+        * @throws DMLRuntimeException
+        */
        private static void flushBlocksToList( Long[] ix, FrameBlock[] fb, 
ArrayList<Tuple2<Long,FrameBlock>> ret ) 
                throws DMLRuntimeException
        {                       

Reply via email to