Repository: incubator-systemml
Updated Branches:
  refs/heads/master 29d62a8a7 -> bcd71d346


[SYSTEMML-859] Fix missing header support in spark csv reblock 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/2455b653
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/2455b653
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/2455b653

Branch: refs/heads/master
Commit: 2455b653245e81739173a7bfb39ebb7d55b9ceff
Parents: 29d62a8
Author: Matthias Boehm <[email protected]>
Authored: Thu Aug 11 19:19:55 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Aug 12 09:49:55 2016 -0700

----------------------------------------------------------------------
 .../instructions/spark/utils/RDDConverterUtils.java     | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2455b653/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 3867dac..3998a3f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -200,7 +200,7 @@ public class RDDConverterUtils
                //convert csv rdd to binary block rdd (w/ partial blocks)
                JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
                                prepinput.mapPartitionsToPair(
-                                       new CSVToBinaryBlockFunction(mcOut, 
delim, fill, fillValue));
+                                       new CSVToBinaryBlockFunction(mcOut, 
hasHeader, delim, fill, fillValue));
                
                //aggregate partial matrix blocks
                out = RDDAggregateUtils.mergeByKey( out ); 
@@ -522,16 +522,18 @@ public class RDDConverterUtils
                private long _clen = -1;
                private int _brlen = -1;
                private int _bclen = -1;
+               private boolean _header = false;
                private String _delim = null;
                private boolean _fill = false;
                private double _fillValue = 0;
                
-               public CSVToBinaryBlockFunction(MatrixCharacteristics mc, 
String delim, boolean fill, double fillValue)
+               public CSVToBinaryBlockFunction(MatrixCharacteristics mc, 
boolean hasHeader, String delim, boolean fill, double fillValue)
                {
                        _rlen = mc.getRows();
                        _clen = mc.getCols();
                        _brlen = mc.getRowsPerBlock();
                        _bclen = mc.getColsPerBlock();
+                       _header = hasHeader;
                        _delim = delim;
                        _fill = fill;
                        _fillValue = fillValue;
@@ -551,7 +553,11 @@ public class RDDConverterUtils
                        {
                                Tuple2<Text,Long> tmp = arg0.next();
                                String row = tmp._1().toString();
-                               long rowix = tmp._2() + 1;
+                               long rowix = tmp._2() + (_header ? 0 : 1);
+                               
+                               //skip existing header
+                               if( _header && rowix == 0  ) 
+                                       continue;
                                
                                long rix = 
UtilFunctions.computeBlockIndex(rowix, _brlen);
                                int pos = 
UtilFunctions.computeCellInBlock(rowix, _brlen);

Reply via email to