Repository: incubator-systemml
Updated Branches:
  refs/heads/master 8a0df5b85 -> eebb9665a


[SYSTEMML-562] Spark frame csv reblock instruction, tests

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/eebb9665
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/eebb9665
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/eebb9665

Branch: refs/heads/master
Commit: eebb9665a8b442ba0f6a946ffd3396ca5905c893
Parents: 8a0df5b
Author: Matthias Boehm <[email protected]>
Authored: Sat Jun 11 20:40:04 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Sat Jun 11 20:40:04 2016 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/hops/OptimizerUtils.java   |  8 +++
 .../rewrite/RewriteBlockSizeAndReblock.java     |  5 +-
 .../spark/CSVReblockSPInstruction.java          | 68 ++++++++++++++++----
 .../instructions/spark/WriteSPInstruction.java  |  4 +-
 .../spark/utils/FrameRDDConverterUtils.java     | 32 ++++-----
 .../functions/frame/FrameConverterTest.java     |  7 +-
 .../functions/frame/FrameMatrixReblockTest.java | 39 ++++++-----
 7 files changed, 109 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eebb9665/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index 833f5ea..aa9f7d0 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -195,6 +195,14 @@ public class OptimizerUtils
         */
        public static final boolean ALLOW_COMBINE_FILE_INPUT_FORMAT = true;
        
+       /**
+        * Enables automatic csv-binary block reblock.
+        * 
+        * TODO enable by default and remove once file-based transform 
completely
+        * removed via frame-based transform/transformapply 
+        */
+       public static boolean ALLOW_FRAME_CSV_REBLOCK = false;
+       
        
        //////////////////////
        // Optimizer levels //

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eebb9665/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java
index 92dcd69..2b875a1 100644
--- 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java
+++ 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java
@@ -93,9 +93,10 @@ public class RewriteBlockSizeAndReblock extends 
HopRewriteRule
                if (hop instanceof DataOp) 
                {
                        // if block size does not match
-                       if( canReblock //TODO change frame condition to != 
BINARY once transform over frames supported
+                       if( canReblock 
                                && ((hop.getDataType() == DataType.MATRIX && 
(hop.getRowsInBlock() != GLOBAL_BLOCKSIZE || hop.getColsInBlock() != 
GLOBAL_BLOCKSIZE)
-                                 ||(hop.getDataType() == DataType.FRAME && 
OptimizerUtils.isSparkExecutionMode() && 
((DataOp)hop).getInputFormatType()==FileFormatTypes.TEXT)))) 
+                                 ||(hop.getDataType() == DataType.FRAME && 
OptimizerUtils.isSparkExecutionMode() && 
(((DataOp)hop).getInputFormatType()==FileFormatTypes.TEXT
+                                                 || 
((DataOp)hop).getInputFormatType()==FileFormatTypes.CSV && 
OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK))))) 
                        {
                                if (((DataOp) hop).getDataOpType() == 
DataOp.DataOpTypes.PERSISTENTREAD) 
                                {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eebb9665/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
index 98cc5a0..44a076d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
@@ -22,17 +22,19 @@ package org.apache.sysml.runtime.instructions.spark;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.spark.api.java.JavaPairRDD;
-
 import org.apache.sysml.hops.recompile.Recompiler;
+import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
@@ -86,15 +88,14 @@ public class CSVReblockSPInstruction extends 
UnarySPInstruction
        }
 
        @Override
-       @SuppressWarnings("unchecked")
        public void processInstruction(ExecutionContext ec)
                throws DMLRuntimeException 
        {
                SparkExecutionContext sec = (SparkExecutionContext) ec;
 
                //sanity check input info
-               MatrixObject mo = sec.getMatrixObject(input1.getName());
-               MatrixFormatMetaData iimd = (MatrixFormatMetaData) 
mo.getMetaData();
+               CacheableData<?> obj = sec.getCacheableData(input1.getName());
+               MatrixFormatMetaData iimd = (MatrixFormatMetaData) 
obj.getMetaData();
                if (iimd.getInputInfo() != InputInfo.CSVInputInfo) {
                        throw new DMLRuntimeException("The given InputInfo is 
not implemented for "
                                        + "CSVReblockSPInstruction:" + 
iimd.getInputInfo());
@@ -107,24 +108,67 @@ public class CSVReblockSPInstruction extends 
UnarySPInstruction
 
                //check for in-memory reblock (w/ lazy spark context, potential 
for latency reduction)
                if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
-                       Recompiler.executeInMemoryMatrixReblock(sec, 
input1.getName(), output.getName());
+                       if( input1.getDataType() == DataType.MATRIX )
+                               Recompiler.executeInMemoryMatrixReblock(sec, 
input1.getName(), output.getName());
+                       else if( input1.getDataType() == DataType.FRAME )
+                               Recompiler.executeInMemoryFrameReblock(sec, 
input1.getName(), output.getName());
                        return;
                }
                
                //check jdk version (prevent double.parseDouble contention on 
<jdk8)
                sec.checkAndRaiseValidationWarningJDKVersion();
                
+               //execute matrix/frame csvreblock 
+               JavaPairRDD<?,?> out = null;
+               if( input1.getDataType() == DataType.MATRIX )
+                       out = processMatrixCSVReblockInstruction(sec, mcOut);
+               else if( input1.getDataType() == DataType.FRAME )
+                       out = processFrameCSVReblockInstruction(sec, mcOut);
+                       
+               // put output RDD handle into symbol table
+               sec.setRDDHandleForVariable(output.getName(), out);
+               sec.addLineageRDD(output.getName(), input1.getName());
+       }
+       
+       /**
+        * 
+        * @param sec
+        * @param mcOut
+        * @return
+        * @throws DMLRuntimeException
+        */
+       @SuppressWarnings("unchecked")
+       protected JavaPairRDD<MatrixIndexes,MatrixBlock> 
processMatrixCSVReblockInstruction(SparkExecutionContext sec, 
MatrixCharacteristics mcOut) 
+               throws DMLRuntimeException
+       {
                //get input rdd (needs to be longwritable/text for consistency 
with meta data, in case of
                //serialization issues create longwritableser/textser as 
serializable wrappers
                JavaPairRDD<LongWritable, Text> in = (JavaPairRDD<LongWritable, 
Text>) 
-                               sec.getRDDHandleForVariable(input1.getName(), 
iimd.getInputInfo());
+                               sec.getRDDHandleForVariable(input1.getName(), 
InputInfo.CSVInputInfo);
                        
                //reblock csv to binary block
-               JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
RDDConverterUtils.csvToBinaryBlock(
-                               sec.getSparkContext(), in, mcOut, _hasHeader, 
_delim, _fill, _fillValue);
+               return 
RDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(), 
+                               in, mcOut, _hasHeader, _delim, _fill, 
_fillValue);
+       }
+       
+       /**
+        * 
+        * @param sec
+        * @param mcOut
+        * @return
+        * @throws DMLRuntimeException
+        */
+       @SuppressWarnings("unchecked")
+       protected JavaPairRDD<Long,FrameBlock> 
processFrameCSVReblockInstruction(SparkExecutionContext sec, 
MatrixCharacteristics mcOut) 
+               throws DMLRuntimeException
+       {
+               //get input rdd (needs to be longwritable/text for consistency 
with meta data, in case of
+               //serialization issues create longwritableser/textser as 
serializable wrappers
+               JavaPairRDD<LongWritable, Text> in = (JavaPairRDD<LongWritable, 
Text>) 
+                               sec.getRDDHandleForVariable(input1.getName(), 
InputInfo.CSVInputInfo);
                
-               // put output RDD handle into symbol table
-               sec.setRDDHandleForVariable(output.getName(), out);
-               sec.addLineageRDD(output.getName(), input1.getName());
+               //reblock csv to binary block
+               return 
FrameRDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(), 
+                               in, mcOut, _hasHeader, _delim, _fill, 
_fillValue);
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eebb9665/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index 682e0b7..f42c29e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -38,7 +38,7 @@ import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ComputeBinaryBlockNnzFunction;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ConvertMatrixBlockToIJVLines;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
-import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameBlockToLongWritableFrameBlock;
+import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -302,7 +302,7 @@ public class WriteSPInstruction extends SPInstruction
                }
                else if( oi == OutputInfo.BinaryBlockOutputInfo ) 
                {
-                       JavaPairRDD<LongWritable,FrameBlock> out = 
in1.mapToPair(new LongFrameBlockToLongWritableFrameBlock());
+                       JavaPairRDD<LongWritable,FrameBlock> out = 
in1.mapToPair(new LongFrameToLongWritableFrameFunction());
                        out.saveAsHadoopFile(fname, LongWritable.class, 
FrameBlock.class, SequenceFileOutputFormat.class);
                }
                else {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eebb9665/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 7c8c08b..b2b1a97 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -75,7 +75,7 @@ public class FrameRDDConverterUtils
         * @return
         * @throws DMLRuntimeException
         */
-       public static JavaPairRDD<LongWritable, FrameBlock> 
csvToBinaryBlock(JavaSparkContext sc,
+       public static JavaPairRDD<Long, FrameBlock> 
csvToBinaryBlock(JavaSparkContext sc,
                        JavaPairRDD<LongWritable, Text> input, 
MatrixCharacteristics mcOut, 
                        boolean hasHeader, String delim, boolean fill, double 
fillValue) 
                throws DMLRuntimeException 
@@ -94,9 +94,9 @@ public class FrameRDDConverterUtils
                                .zipWithIndex(); //zip row index
                
                //convert csv rdd to binary block rdd (w/ partial blocks)
-               JavaPairRDD<LongWritable, FrameBlock> out = 
-                               prepinput.mapPartitionsToPair(
-                                       new CSVToBinaryBlockFunction(mcOut, 
hasHeader, delim, fill));
+               JavaPairRDD<Long, FrameBlock> out = prepinput
+                               .mapPartitionsToPair(new 
CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill))
+                               .mapToPair(new 
LongWritableFrameToLongFrameFunction());
                
                return out;
        }
@@ -112,7 +112,7 @@ public class FrameRDDConverterUtils
         * @return
         * @throws DMLRuntimeException
         */
-       public static JavaPairRDD<LongWritable, FrameBlock> 
csvToBinaryBlock(JavaSparkContext sc,
+       public static JavaPairRDD<Long, FrameBlock> 
csvToBinaryBlock(JavaSparkContext sc,
                        JavaRDD<String> input, MatrixCharacteristics mcOut, 
                        boolean hasHeader, String delim, boolean fill, double 
fillValue) 
                throws DMLRuntimeException 
@@ -340,34 +340,30 @@ public class FrameRDDConverterUtils
        }
        
        /**
-        *
+        * 
         */
-       public static class LongFrameBlockToLongWritableFrameBlock implements 
PairFunction<Tuple2<Long,FrameBlock>,LongWritable,FrameBlock> 
+       public static class LongFrameToLongWritableFrameFunction implements 
PairFunction<Tuple2<Long,FrameBlock>,LongWritable,FrameBlock> 
        {
-               private static final long serialVersionUID = 
3201887196237766424L;
+               private static final long serialVersionUID = 
-1467314923206783333L;
 
                @Override
                public Tuple2<LongWritable, FrameBlock> call(Tuple2<Long, 
FrameBlock> arg0) throws Exception  {
-                       return new Tuple2<LongWritable,FrameBlock>(new 
LongWritable(arg0._1), arg0._2);
+                       return new Tuple2<LongWritable, FrameBlock>(new 
LongWritable(arg0._1), arg0._2);
                }
        }
-       
-       
-       
+
        /**
         * 
         */
-       public static class LongFrameToLongWritableFrameFunction implements 
PairFunction<Tuple2<Long,FrameBlock>,LongWritable,FrameBlock> 
+       public static class LongWritableFrameToLongFrameFunction implements 
PairFunction<Tuple2<LongWritable,FrameBlock>,Long,FrameBlock> 
        {
-
-               private static final long serialVersionUID = 
-1467314923206783333L;
+               private static final long serialVersionUID = 
-1232439643533739078L;
 
                @Override
-               public Tuple2<LongWritable, FrameBlock> call(Tuple2<Long, 
FrameBlock> arg0) throws Exception  {
-                       return new Tuple2<LongWritable, FrameBlock>(new 
LongWritable(arg0._1), arg0._2);
+               public Tuple2<Long, FrameBlock> call(Tuple2<LongWritable, 
FrameBlock> arg0) throws Exception  {
+                       return new Tuple2<Long, FrameBlock>(arg0._1.get(), 
arg0._2);
                }
        }
-
        
        /**
         * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eebb9665/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index d0a7f88..e271824 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -38,7 +38,7 @@ import 
org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyFrameBlockPairFunction;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
-import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameBlockToLongWritableFrameBlock;
+import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction;
 import org.apache.sysml.runtime.io.FrameReader;
 import org.apache.sysml.runtime.io.FrameReaderFactory;
 import org.apache.sysml.runtime.io.FrameWriter;
@@ -440,7 +440,8 @@ public class FrameConverterTest extends AutomatedTestBase
                                OutputInfo oinfo = 
OutputInfo.BinaryBlockOutputInfo;
                                JavaPairRDD<LongWritable,Text> rddIn = 
sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, 
iinfo.inputValueClass);
                                JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils
-                                               .csvToBinaryBlock(sc, rddIn, 
mc, false, ",", false, 0);
+                                               .csvToBinaryBlock(sc, rddIn, 
mc, false, ",", false, 0)
+                                               .mapToPair(new 
LongFrameToLongWritableFrameFunction());
                                rddOut.saveAsHadoopFile(fnameOut, 
LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
                                break;
                        }
@@ -459,7 +460,7 @@ public class FrameConverterTest extends AutomatedTestBase
                                JavaPairRDD<LongWritable,Text> rddIn = 
sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, 
iinfo.inputValueClass);
                                JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils
                                                .textCellToBinaryBlock(sc, 
rddIn, mc, schema)
-                                               .mapToPair(new 
LongFrameBlockToLongWritableFrameBlock());
+                                               .mapToPair(new 
LongFrameToLongWritableFrameFunction());
                                rddOut.saveAsHadoopFile(fnameOut, 
LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
                                break;
                        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eebb9665/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
index 7da887e..ecc958b 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.LopProperties.ExecType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.io.FrameWriter;
@@ -99,11 +100,10 @@ public class FrameMatrixReblockTest extends 
AutomatedTestBase
                runFrameReblockTest(TEST_NAME1, false, false, "text", 
ExecType.SPARK);
        }
 
-//TODO enable csv spark tests once transform over frame supported      
-//     @Test
-//     public void testFrameWriteSingleDenseCsvSpark() {
-//             runFrameReblockTest(TEST_NAME1, false, false, "csv", 
ExecType.SPARK);
-//     }
+       @Test
+       public void testFrameWriteSingleDenseCsvSpark() {
+               runFrameReblockTest(TEST_NAME1, false, false, "csv", 
ExecType.SPARK);
+       }
        
        @Test
        public void testFrameWriteMultipleDenseBinarySpark() {
@@ -115,10 +115,10 @@ public class FrameMatrixReblockTest extends 
AutomatedTestBase
                runFrameReblockTest(TEST_NAME1, true, false, "text", 
ExecType.SPARK);
        }
        
-//     @Test
-//     public void testFrameWriteMultipleDenseCsvSpark() {
-//             runFrameReblockTest(TEST_NAME1, true, false, "csv", 
ExecType.SPARK);
-//     }
+       @Test
+       public void testFrameWriteMultipleDenseCsvSpark() {
+               runFrameReblockTest(TEST_NAME1, true, false, "csv", 
ExecType.SPARK);
+       }
        
        @Test
        public void testFrameWriteSingleSparseBinaryCP() {
@@ -160,10 +160,10 @@ public class FrameMatrixReblockTest extends 
AutomatedTestBase
                runFrameReblockTest(TEST_NAME1, false, true, "text", 
ExecType.SPARK);
        }
        
-//     @Test
-//     public void testFrameWriteSingleSparseCsvSpark() {
-//             runFrameReblockTest(TEST_NAME1, false, true, "csv", 
ExecType.SPARK);
-//     }
+       @Test
+       public void testFrameWriteSingleSparseCsvSpark() {
+               runFrameReblockTest(TEST_NAME1, false, true, "csv", 
ExecType.SPARK);
+       }
        
        @Test
        public void testFrameWriteMultipleSparseBinarySpark() {
@@ -175,10 +175,10 @@ public class FrameMatrixReblockTest extends 
AutomatedTestBase
                runFrameReblockTest(TEST_NAME1, true, true, "text", 
ExecType.SPARK);
        }
        
-//     @Test
-//     public void testFrameWriteMultipleSparseCsvSpark() {
-//             runFrameReblockTest(TEST_NAME1, true, true, "csv", 
ExecType.SPARK);
-//     }
+       @Test
+       public void testFrameWriteMultipleSparseCsvSpark() {
+               runFrameReblockTest(TEST_NAME1, true, true, "csv", 
ExecType.SPARK);
+       }
        
        /**
         * 
@@ -201,6 +201,10 @@ public class FrameMatrixReblockTest extends 
AutomatedTestBase
                if( rtplatform == RUNTIME_PLATFORM.SPARK )
                        DMLScript.USE_LOCAL_SPARK_CONFIG = true;
                
+               boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
+               if( ofmt.equals("csv") )
+                       OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
+               
                try
                {
                        int cols = multColBlks ? cols2 : cols1;
@@ -231,6 +235,7 @@ public class FrameMatrixReblockTest extends 
AutomatedTestBase
                finally {
                        rtplatform = platformOld;
                        DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+                       OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
                }
        }
        

Reply via email to