This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 4cebe8ed343801e565c769fc8208001251242970
Author: baunsgaard <[email protected]>
AuthorDate: Wed Oct 12 23:51:33 2022 +0200

    [SYSTEMDS-3444] Spark Write compressed format
    
    This commit fixes/adds support for spark writing,
    the previous versions did not properly support the edge cases,
    while this commit fixes inconsistencies, and allow local reading
    of folders containing sub blocks.
---
 .../org/apache/sysds/parser/DMLTranslator.java     |   6 +-
 .../org/apache/sysds/parser/DataExpression.java    |   2 +-
 .../sysds/runtime/compress/io/CompressWrap.java    |  36 ++++
 .../runtime/compress/io/ReaderCompressed.java      |  20 +-
 .../runtime/compress/io/WriterCompressed.java      |  18 ++
 .../runtime/compress/lib/CLALibDecompress.java     |  20 +-
 .../instructions/spark/WriteSPInstruction.java     |  24 ++-
 .../functions/ExtractBlockForBinaryReblock.java    |  19 +-
 .../spark/utils/RDDAggregateUtils.java             |  23 +-
 .../sysds/runtime/matrix/data/MatrixBlock.java     |   2 +
 ...OTestUtils.java => IOCompressionTestUtils.java} |   6 +-
 .../sysds/test/component/compress/io/IOEmpty.java  |  14 +-
 .../sysds/test/component/compress/io/IOSpark.java  | 240 ++++++++++++++++++---
 .../sysds/test/component/compress/io/IOTest.java   |  14 +-
 .../io/compressed/WriteCompressedTest.java         |  95 ++++++++
 .../io/compressed/WriteCompressedTest.dml          |  26 +++
 16 files changed, 485 insertions(+), 80 deletions(-)

diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java 
b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
index c196f780c3..315f54ff72 100644
--- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
@@ -1053,20 +1053,18 @@ public class DMLTranslator
                                                case MM:
                                                case CSV:
                                                case LIBSVM:
+                                               case HDF5:
                                                        // write output in 
textcell format
                                                        
ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(), 
-1);
                                                        break;
                                                case BINARY:
+                                               case COMPRESSED:
                                                        // write output in 
binary block format
                                                        
ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(), 
ae.getBlocksize());
                                                        break;
                                                case FEDERATED:
                                                        
ae.setOutputParams(ae.getDim1(), ae.getDim2(), -1, ae.getUpdateType(), -1);
                                                        break;
-                                               case HDF5:
-                                                       // write output in HDF5 
format
-                                                       
ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(), 
-1);
-                                                       break;
                                                default:
                                                        throw new 
LanguageException("Unrecognized file format: " + ae.getFileFormat());
                                        }
diff --git a/src/main/java/org/apache/sysds/parser/DataExpression.java 
b/src/main/java/org/apache/sysds/parser/DataExpression.java
index c3d6edd5e3..a76aa3dd4c 100644
--- a/src/main/java/org/apache/sysds/parser/DataExpression.java
+++ b/src/main/java/org/apache/sysds/parser/DataExpression.java
@@ -1327,7 +1327,7 @@ public class DataExpression extends DataIdentifier
                        //validate read filename
                        if (getVarParam(FORMAT_TYPE) == null || 
FileFormat.isTextFormat(getVarParam(FORMAT_TYPE).toString()))
                                getOutput().setBlocksize(-1);
-                       else if 
(getVarParam(FORMAT_TYPE).toString().equalsIgnoreCase(FileFormat.BINARY.toString()))
 {
+                       else if 
(getVarParam(FORMAT_TYPE).toString().equalsIgnoreCase(FileFormat.BINARY.toString())
 || 
getVarParam(FORMAT_TYPE).toString().equalsIgnoreCase(FileFormat.COMPRESSED.toString()))
 {
                                if( getVarParam(ROWBLOCKCOUNTPARAM)!=null )
                                        
getOutput().setBlocksize(Integer.parseInt(getVarParam(ROWBLOCKCOUNTPARAM).toString()));
                                else
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/io/CompressWrap.java 
b/src/main/java/org/apache/sysds/runtime/compress/io/CompressWrap.java
new file mode 100644
index 0000000000..e3e370b694
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/io/CompressWrap.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.io;
+
+import org.apache.spark.api.java.function.Function;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
+public class CompressWrap implements Function<MatrixBlock, 
CompressedWriteBlock> {
+       private static final long serialVersionUID = 966405324406154236L;
+
+       public CompressWrap() {
+       }
+
+       @Override
+       public CompressedWriteBlock call(MatrixBlock arg0) throws Exception {
+               return new CompressedWriteBlock(arg0);
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java 
b/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java
index 0b57a8ec02..eb5fa08d60 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java
@@ -69,11 +69,23 @@ public final class ReaderCompressed extends MatrixReader {
 
        private static MatrixBlock readCompressedMatrix(Path path, JobConf job, 
FileSystem fs, int rlen, int clen, int blen)
                throws IOException {
-               final Reader reader = new SequenceFile.Reader(job, 
SequenceFile.Reader.file(path));
 
+               final Map<MatrixIndexes, MatrixBlock> data = new HashMap<>();
+
+               for(Path subPath : IOUtilFunctions.getSequenceFilePaths(fs, 
path))
+                       read(subPath, job, data);
+
+               if(data.size() == 1)
+                       return data.entrySet().iterator().next().getValue();
+               else
+                       return CLALibCombine.combine(data);
+       }
+
+       private static void read(Path path, JobConf job, Map<MatrixIndexes, 
MatrixBlock> data) throws IOException {
+
+               final Reader reader = new SequenceFile.Reader(job, 
SequenceFile.Reader.file(path));
                try {
                        // Materialize all sub blocks.
-                       Map<MatrixIndexes, MatrixBlock> data = new HashMap<>();
 
                        // Use write and read interface to read and write this 
object.
                        MatrixIndexes key = new MatrixIndexes();
@@ -84,10 +96,6 @@ public final class ReaderCompressed extends MatrixReader {
                                key = new MatrixIndexes();
                                value = new CompressedWriteBlock();
                        }
-                       if(data.size() == 1)
-                               return 
data.entrySet().iterator().next().getValue();
-                       else
-                               return CLALibCombine.combine(data);
                }
                finally {
                        IOUtilFunctions.closeSilently(reader);
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java 
b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
index 7a3e07cf3c..207c6961fa 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
@@ -28,16 +28,22 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory;
+import 
org.apache.sysds.runtime.instructions.spark.CompressionSPInstruction.CompressionFunction;
+import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysds.runtime.io.FileFormatProperties;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
 import org.apache.sysds.runtime.io.MatrixWriter;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.runtime.util.HDFSTool;
 
 public final class WriterCompressed extends MatrixWriter {
@@ -62,6 +68,18 @@ public final class WriterCompressed extends MatrixWriter {
                create(null).writeMatrixToHDFS(src, fname, rlen, clen, blen, 
nnz, diag);
        }
 
+       public static void writeRDDToHDFS(JavaPairRDD<MatrixIndexes, 
MatrixBlock> src, String path, int blen,
+               DataCharacteristics mc) {
+               final DataCharacteristics outC = new 
MatrixCharacteristics(mc).setBlocksize(blen);
+               writeRDDToHDFS(RDDConverterUtils.binaryBlockToBinaryBlock(src, 
mc, outC), path);
+       }
+
+       public static void writeRDDToHDFS(JavaPairRDD<MatrixIndexes, 
MatrixBlock> src, String path) {
+               src.mapValues(new CompressionFunction()) // Try to compress 
each block.
+                       .mapValues(new CompressWrap()) // Wrap in writable
+                       .saveAsHadoopFile(path, MatrixIndexes.class, 
CompressedWriteBlock.class, SequenceFileOutputFormat.class);
+       }
+
        @Override
        public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, 
long clen, int blen, long nnz, boolean diag)
                throws IOException {
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java
index 30ea5de820..b4a38b1ce7 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java
@@ -61,6 +61,21 @@ public class CLALibDecompress {
 
        public static void decompressTo(CompressedMatrixBlock cmb, MatrixBlock 
ret, int rowOffset, int colOffset, int k) {
                Timing time = new Timing(true);
+               if(cmb.getNumColumns() + colOffset > ret.getNumColumns() || 
cmb.getNumRows() + rowOffset > ret.getNumRows()) {
+                       LOG.warn(
+                               "Slow slicing off excess parts for decompressTo 
because decompression into is implemented for fitting blocks");
+                       MatrixBlock mbSliced = cmb.slice( //
+                               Math.min(Math.abs(rowOffset), 0), 
Math.min(cmb.getNumRows(), ret.getNumRows() - rowOffset) - 1, // Rows
+                               Math.min(Math.abs(colOffset), 0), 
Math.min(cmb.getNumColumns(), ret.getNumColumns() - colOffset) - 1); // Cols
+                       if(mbSliced instanceof MatrixBlock) {
+                               mbSliced.putInto(ret, rowOffset, colOffset, 
false);
+                               return;
+                       }
+
+                       cmb = (CompressedMatrixBlock) mbSliced;
+                       decompress(cmb, 1);
+               }
+
                final boolean outSparse = ret.isInSparseFormat();
                if(!cmb.isEmpty()) {
                        if(outSparse && cmb.isOverlapping())
@@ -78,8 +93,7 @@ public class CLALibDecompress {
                                LOG.trace("decompressed block w/ k=" + k + " in 
" + t + "ms.");
                }
 
-               if(ret.getNonZeros() <= 0)
-                       ret.setNonZeros(cmb.getNonZeros());
+               ret.recomputeNonZeros();
        }
 
        private static void decompressToSparseBlock(CompressedMatrixBlock cmb, 
MatrixBlock ret, int rowOffset,
@@ -96,6 +110,8 @@ public class CLALibDecompress {
                else
                        for(AColGroup g : groups)
                                g.decompressToSparseBlock(sb, 0, nRows, 
rowOffset, colOffset);
+               sb.sort();
+               ret.checkSparseRows();
        }
 
        private static void decompressToDenseBlock(CompressedMatrixBlock cmb, 
DenseBlock ret, int rowOffset, int colOffset) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
index bad0d2fde5..97371a477f 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
@@ -19,6 +19,10 @@
 
 package org.apache.sysds.runtime.instructions.spark;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.io.LongWritable;
@@ -31,6 +35,7 @@ import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.io.WriterCompressed;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
@@ -52,10 +57,6 @@ import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.runtime.util.HDFSTool;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Random;
-
 public class WriteSPInstruction extends SPInstruction implements 
LineageTraceable {
        public CPOperand input1 = null;
        private CPOperand input2 = null;
@@ -252,6 +253,21 @@ public class WriteSPInstruction extends SPInstruction 
implements LineageTraceabl
                        if( nonDefaultBlen )
                                mcOut = new 
MatrixCharacteristics(mc).setBlocksize(blen);
                }
+               else if(fmt == FileFormat.COMPRESSED) {
+                       // reblock output if needed
+                       final int blen = Integer.parseInt(input4.getName());
+                       final boolean nonDefaultBlen = 
ConfigurationManager.getBlocksize() != blen;
+                       mc.setNonZeros(-1); // default to unknown non zeros for 
compressed matrix block
+
+                       if(nonDefaultBlen)
+                               WriterCompressed.writeRDDToHDFS(in1, fname, 
blen, mc);
+                       else
+                               WriterCompressed.writeRDDToHDFS(in1, fname);
+               
+                       if(nonDefaultBlen)
+                               mcOut = new 
MatrixCharacteristics(mc).setBlocksize(blen);
+
+               }
                else if(fmt == FileFormat.LIBSVM) {
                        if(mc.getRows() == 0 || mc.getCols() == 0) {
                                throw new IOException(
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
index f9b735e39c..4f20216760 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
@@ -92,15 +91,19 @@ public class ExtractBlockForBinaryReblock implements 
PairFlatMapFunction<Tuple2<
                                final int cixi = 
UtilFunctions.computeCellInBlock(rowLower, out_blen);
                                final int cixj = 
UtilFunctions.computeCellInBlock(colLower, out_blen);
                                
-                               if(in instanceof CompressedMatrixBlock){
-                                       blk.allocateSparseRowsBlock(false);
-                                       
CLALibDecompress.decompressTo((CompressedMatrixBlock) in, blk, cixi, cixj, 1);
-                               }
-                               else if( aligned ) {
-                                       blk.appendToSparse(in, cixi, cixj);
-                                       blk.setNonZeros(in.getNonZeros());
+                               if( aligned ) {
+                                       if(in instanceof CompressedMatrixBlock){
+                                               
blk.allocateSparseRowsBlock(false);
+                                                       
CLALibDecompress.decompressTo((CompressedMatrixBlock) in, blk, cixi- aixi, 
cixj-aixj, 1);
+                                       }else{
+                                               blk.appendToSparse(in, cixi, 
cixj);
+                                               
blk.setNonZeros(in.getNonZeros());
+                                       }
                                }
                                else { //general case
+                                       if(in instanceof CompressedMatrixBlock){
+                                               in = 
CompressedMatrixBlock.getUncompressed(in);
+                                       }
                                        for(int i2 = 0; i2 <= 
(int)(rowUpper-rowLower); i2++)
                                                for(int j2 = 0; j2 <= 
(int)(colUpper-colLower); j2++)
                                                        
blk.appendValue(cixi+i2, cixj+j2, in.quickGetValue(aixi+i2, aixj+j2));
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/RDDAggregateUtils.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 3a3e9602cb..22b313f8e6 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -704,32 +704,27 @@ public class RDDAggregateUtils
                }
                
                @Override
-               public MatrixBlock call(MatrixBlock b1, MatrixBlock b2) 
-                       throws Exception 
-               {
+               public MatrixBlock call(MatrixBlock b1, MatrixBlock b2) throws 
Exception {
                        long b1nnz = b1.getNonZeros();
                        long b2nnz = b2.getNonZeros();
-                       
+
                        // sanity check input dimensions
-                       if (b1.getNumRows() != b2.getNumRows() || 
b1.getNumColumns() != b2.getNumColumns()) {
-                               throw new DMLRuntimeException("Mismatched block 
sizes for: "
-                                               + b1.getNumRows() + " " + 
b1.getNumColumns() + " "
-                                               + b2.getNumRows() + " " + 
b2.getNumColumns());
+                       if(b1.getNumRows() != b2.getNumRows() || 
b1.getNumColumns() != b2.getNumColumns()) {
+                               throw new DMLRuntimeException("Mismatched block 
sizes for: " + b1.getNumRows() + " " + b1.getNumColumns()
+                                       + " " + b2.getNumRows() + " " + 
b2.getNumColumns());
                        }
 
                        // execute merge (never pass by reference)
                        MatrixBlock ret = _deep ? new MatrixBlock(b1) : b1;
                        ret.merge(b2, false, false, _deep);
                        ret.examSparsity();
-                       
+
                        // sanity check output number of non-zeros
-                       if (ret.getNonZeros() != b1nnz + b2nnz) {
-                               throw new DMLRuntimeException("Number of 
non-zeros does not match: "
-                                               + ret.getNonZeros() + " != " + 
b1nnz + " + " + b2nnz);
+                       if(ret.getNonZeros() != b1nnz + b2nnz) {
+                               throw new DMLRuntimeException(
+                                       "Number of non-zeros does not match: " 
+ ret.getNonZeros() + " != " + b1nnz + " + " + b2nnz);
                        }
-
                        return ret;
                }
-
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index ad0ca1c776..b69fb99873 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -1435,6 +1435,8 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                                for(int k = apos; k < apos + alen; k++)
                                        if(avals[k] == 0)
                                                throw new 
RuntimeException("Wrong sparse row: zero at " + k);
+                               if(aix[apos + alen-1] > clen)
+                                       throw new RuntimeException("Invalid 
offset outside of matrix");
                        }
        }
 
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/io/IOTestUtils.java 
b/src/test/java/org/apache/sysds/test/component/compress/io/IOCompressionTestUtils.java
similarity index 94%
rename from 
src/test/java/org/apache/sysds/test/component/compress/io/IOTestUtils.java
rename to 
src/test/java/org/apache/sysds/test/component/compress/io/IOCompressionTestUtils.java
index fe5605ec31..785fc4444d 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/io/IOTestUtils.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/io/IOCompressionTestUtils.java
@@ -28,14 +28,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.sysds.runtime.compress.io.ReaderCompressed;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
-public class IOTestUtils {
+public class IOCompressionTestUtils {
 
        final static Object lock = new Object();
 
        static final AtomicInteger id = new AtomicInteger(0);
 
        protected static void deleteDirectory(File file) {
-               synchronized(IOTestUtils.lock) {
+               synchronized(IOCompressionTestUtils.lock) {
                        for(File subfile : file.listFiles()) {
                                if(subfile.isDirectory())
                                        deleteDirectory(subfile);
@@ -59,7 +59,7 @@ public class IOTestUtils {
                // assertTrue("Disk size is not equivalent", 
a.getExactSizeOnDisk() > b.getExactSizeOnDisk());
        }
 
-       protected static MatrixBlock read(String path) {
+       public static MatrixBlock read(String path) {
                try {
                        return 
ReaderCompressed.readCompressedMatrixFromHDFS(path);
                }
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/io/IOEmpty.java 
b/src/test/java/org/apache/sysds/test/component/compress/io/IOEmpty.java
index 5f913c70d3..41e4f2e904 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/io/IOEmpty.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/io/IOEmpty.java
@@ -39,18 +39,18 @@ public class IOEmpty {
                + IOEmpty.class.getSimpleName() + "/";
 
        public IOEmpty() {
-               synchronized(IOTestUtils.lock) {
+               synchronized(IOCompressionTestUtils.lock) {
                        new File(nameBeginning).mkdirs();
                }
        }
 
        @AfterClass
        public static void cleanup() {
-               IOTestUtils.deleteDirectory(new File(nameBeginning));
+               IOCompressionTestUtils.deleteDirectory(new File(nameBeginning));
        }
 
        public static String getName() {
-               return IOTestUtils.getName(nameBeginning);
+               return IOCompressionTestUtils.getName(nameBeginning);
        }
 
        @Test
@@ -65,8 +65,8 @@ public class IOEmpty {
        public void writeEmptyAndRead() {
                String n = getName();
                write(n, 10, 10, 1000);
-               MatrixBlock mb = IOTestUtils.read(n);
-               IOTestUtils.verifyEquivalence(mb, new MatrixBlock(10, 10, 0.0));
+               MatrixBlock mb = IOCompressionTestUtils.read(n);
+               IOCompressionTestUtils.verifyEquivalence(mb, new 
MatrixBlock(10, 10, 0.0));
        }
 
        @Test
@@ -83,8 +83,8 @@ public class IOEmpty {
                write(n, 1000, 10, 100);
                File f = new File(n);
                assertTrue(f.isDirectory() || f.isFile());
-               MatrixBlock mb = IOTestUtils.read(n);
-               IOTestUtils.verifyEquivalence(mb, new MatrixBlock(1000, 10, 
0.0));
+               MatrixBlock mb = IOCompressionTestUtils.read(n);
+               IOCompressionTestUtils.verifyEquivalence(mb, new 
MatrixBlock(1000, 10, 0.0));
        }
 
        protected static void write(String path, int nRows, int nCols, int 
blen) {
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/io/IOSpark.java 
b/src/test/java/org/apache/sysds/test/component/compress/io/IOSpark.java
index 61b699c254..847938bff3 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/io/IOSpark.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/io/IOSpark.java
@@ -20,6 +20,7 @@
 package org.apache.sysds.test.component.compress.io;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.File;
@@ -31,14 +32,18 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.compress.io.CompressUnwrap;
 import org.apache.sysds.runtime.compress.io.CompressedWriteBlock;
 import org.apache.sysds.runtime.compress.io.WriterCompressed;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysds.runtime.io.InputOutputInfo;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.test.TestUtils;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -54,11 +59,11 @@ public class IOSpark {
 
        @AfterClass
        public static void cleanup() {
-               IOTestUtils.deleteDirectory(new File(nameBeginning));
+               IOCompressionTestUtils.deleteDirectory(new File(nameBeginning));
        }
 
        private static String getName() {
-               return IOTestUtils.getName(nameBeginning);
+               return IOCompressionTestUtils.getName(nameBeginning);
        }
 
        @Test
@@ -109,38 +114,202 @@ public class IOSpark {
                readWrite(mb);
        }
 
-       private void readWrite(MatrixBlock mb) {
-               double sum = mb.sum();
-               String n = getName();
-               try {
-                       WriterCompressed.writeCompressedMatrixToHDFS(mb, n, 50);
-               }
-               catch(Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
+       @Test
+       public void writeSparkReadCPMultiColBlock() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 124, 1, 3, 1.0, 2514));
+               testWriteSparkReadCP(mb, 100, 100);
+       }
+
+       @Test
+       public void writeSparkReadCPMultiRowBlock() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(1322, 33, 1, 3, 1.0, 2514));
+               testWriteSparkReadCP(mb, 100, 100);
+       }
+
+       @Test
+       public void writeSparkReadCPSingleBlock() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 99, 1, 3, 1.0, 33));
+               testWriteSparkReadCP(mb, 100, 100);
+       }
+
+       @Test
+       public void writeSparkReadCPMultiBlock() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(580, 244, 1, 3, 1.0, 33));
+               testWriteSparkReadCP(mb, 100, 100);
+       }
+
+       @Test
+       public void writeSparkReadCPMultiColBlockReblockUp() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 124, 1, 3, 1.0, 2514));
+               testWriteSparkReadCP(mb, 100, 150);
+       }
+
+       @Test
+       public void writeSparkReadCPMultiRowBlockReblockUp() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(1322, 33, 1, 3, 1.0, 2514));
+               testWriteSparkReadCP(mb, 100, 150);
+       }
+
+       @Test
+       public void writeSparkReadCPSingleBlockReblockUp() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 99, 1, 3, 1.0, 33));
+               testWriteSparkReadCP(mb, 100, 150);
+       }
+
+       @Test
+       public void writeSparkReadCPMultiBlockReblockUp() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(580, 244, 1, 3, 1.0, 33));
+               testWriteSparkReadCP(mb, 100, 150);
+       }
+
+       @Test
+       public void writeSparkReadCPMultiColBlockReblockDown() throws Exception 
{
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 124, 1, 3, 1.0, 2514));
+               testWriteSparkReadCP(mb, 100, 80);
+       }
+
+       @Test
+       public void writeSparkReadCPMultiRowBlockReblockDown() throws Exception 
{
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(1322, 33, 1, 3, 1.0, 2514));
+               testWriteSparkReadCP(mb, 100, 80);
+       }
+
+       @Test
+       public void writeSparkReadCPSingleBlockReblockDown() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 99, 1, 3, 1.0, 33));
+               testWriteSparkReadCP(mb, 100, 80);
+       }
+
+       @Test
+       public void writeSparkReadCPMultiBlockReblockDown() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(580, 244, 1, 3, 1.0, 33));
+               testWriteSparkReadCP(mb, 100, 80);
+       }
+
+       private void testWriteSparkReadCP(MatrixBlock mb, int blen1, int blen2) 
throws Exception {
+
+               String f1 = getName();
+               WriterCompressed.writeCompressedMatrixToHDFS(mb, f1, blen1);
+
+               // Make sure the first file is written
+               File f = new File(f1);
+               assertTrue(f.isFile() || f.isDirectory());
+               // Read in again as RDD
+               JavaPairRDD<MatrixIndexes, MatrixBlock> m = getRDD(f1);
+               String f2 = getName(); // get new name for writing RDD.
+
+               // Write RDD to disk
+               if(blen1 != blen2) {
+                       DataCharacteristics mc = new 
MatrixCharacteristics(mb.getNumRows(), mb.getNumColumns(), blen1);
+                       WriterCompressed.writeRDDToHDFS(m, f2, blen2, mc);
                }
-               verifySum(read(n), sum, 0.0001);
+               else
+                       WriterCompressed.writeRDDToHDFS(m, f2);
+
+               // Read locally the spark block written.
+               MatrixBlock mbr = IOCompressionTestUtils.read(f2);
+               IOCompressionTestUtils.verifyEquivalence(mb, mbr);
        }
 
-       private void verifySum(List<Tuple2<MatrixIndexes, MatrixBlock>> c, 
double val, double tol) {
-               double sum = 0.0;
-               for(Tuple2<MatrixIndexes, MatrixBlock> b : c)
-                       sum += b._2().sum();
-               assertEquals(val, sum, tol);
+       @Test
+       public void testReblock_up() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 50, 1, 3, 1.0, 2514));
+               testReblock(mb, 25, 50);
        }
 
-       private List<Tuple2<MatrixIndexes, MatrixBlock>> read(String n) {
-               JavaPairRDD<MatrixIndexes, MatrixBlock> m = 
getRDD(n).mapValues(x -> x.get());
-               return m.collect();
+       @Test
+       public void testReblock_up_2() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 50, 1, 3, 1.0, 2514));
+               testReblock(mb, 25, 55);
+       }
+
+       @Test
+       public void testReblock_up_3() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(165, 110, 1, 3, 1.0, 2514));
+               testReblock(mb, 25, 55);
+       }
+
+       @Test
+       public void testReblock_up_4() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(165, 110, 1, 3, 1.0, 2514));
+               testReblock(mb, 25, 100);
+       }
+
+       @Test
+       public void testReblock_up_5() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(230, 401, 1, 3, 1.0, 2514));
+               testReblock(mb, 25, 100);
+       }
+
+       @Test
+       public void testReblock_down() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 50, 1, 3, 1.0, 2514));
+               testReblock(mb, 50, 25);
+       }
+
+       @Test
+       public void testReblock_down_2() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 50, 1, 3, 1.0, 2514));
+               testReblock(mb, 55, 25);
+       }
+
+       @Test
+       public void testReblock_down_3() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(165, 110, 1, 3, 1.0, 2514));
+               testReblock(mb, 55, 25);
+       }
+
+       @Test
+       public void testReblock_down_4() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(165, 110, 1, 3, 1.0, 2514));
+               testReblock(mb, 100, 25);
+       }
+
+       @Test
+       public void testReblock_down_5() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(230, 401, 1, 3, 1.0, 2514));
+               testReblock(mb, 100, 25);
+       }
+
+       private void testReblock(MatrixBlock mb, int blen1, int blen2) throws 
Exception {
+               String f1 = getName();
+               WriterCompressed.writeCompressedMatrixToHDFS(mb, f1, blen1);
+
+               // Read in again as RDD
+               JavaPairRDD<MatrixIndexes, MatrixBlock> m = getRDD(f1); // Our 
starting point
 
+               int nBlocksExpected = (1 + (mb.getNumColumns() - 1) / blen1) * 
(1 + (mb.getNumRows() - 1) / blen1);
+               int nBlocksActual = m.collect().size();
+               assertEquals("Expected same number of blocks ", 
nBlocksExpected, nBlocksActual);
+
+               DataCharacteristics mc = new 
MatrixCharacteristics(mb.getNumRows(), mb.getNumColumns(), blen1);
+               JavaPairRDD<MatrixIndexes, MatrixBlock> m2 = reblock(m, mc, 
blen2);
+               int nBlocksExpected2 = (1 + (mb.getNumColumns() - 1) / blen2) * 
(1 + (mb.getNumRows() - 1) / blen2);
+               int nBlocksActual2 = m2.collect().size();
+               assertEquals("Expected same number of blocks on re-blocked", 
nBlocksExpected2, nBlocksActual2);
+
+               double val = mb.sum();
+               verifySum(m, val, 0.0000001);
+               verifySum(m2, val, 0.0000001);
+
+       }
+
+       private static JavaPairRDD<MatrixIndexes, MatrixBlock> 
reblock(JavaPairRDD<MatrixIndexes, MatrixBlock> in,
+               DataCharacteristics mc, int blen) {
+               final DataCharacteristics outC = new 
MatrixCharacteristics(mc).setBlocksize(blen);
+               return RDDConverterUtils.binaryBlockToBinaryBlock(in, mc, outC);
+       }
+
+       private List<Tuple2<MatrixIndexes, MatrixBlock>> read(String n) {
+               return getRDD(n).collect();
        }
 
        @SuppressWarnings({"unchecked"})
-       private JavaPairRDD<MatrixIndexes, CompressedWriteBlock> getRDD(String 
path) {
+       private JavaPairRDD<MatrixIndexes, MatrixBlock> getRDD(String path) {
                InputOutputInfo inf = InputOutputInfo.CompressedInputOutputInfo;
                JavaSparkContext sc = 
SparkExecutionContext.getSparkContextStatic();
-               return (JavaPairRDD<MatrixIndexes, CompressedWriteBlock>) 
sc.hadoopFile(path, inf.inputFormatClass, inf.keyClass,
-                       inf.valueClass);
+               return ((JavaPairRDD<MatrixIndexes, CompressedWriteBlock>) 
sc.hadoopFile(path, inf.inputFormatClass, inf.keyClass,
+                       inf.valueClass)).mapValues(new CompressUnwrap());
        }
 
        @SuppressWarnings({"unchecked"})
@@ -158,13 +327,36 @@ public class IOSpark {
 
                        JavaPairRDD<MatrixIndexes, MatrixBlock> m = 
(JavaPairRDD<MatrixIndexes, MatrixBlock>) ec
                                .getRDDHandleForMatrixObject(obj, fmt);
+                       List<Tuple2<MatrixIndexes, MatrixBlock>> c = 
m.collect();
+                       verifySum(c, mb.sum(), 0.0001);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
 
-                       verifySum(m.collect(), mb.sum(), 0.0001);
+       private void readWrite(MatrixBlock mb) {
+               double sum = mb.sum();
+               String n = getName();
+               try {
+                       WriterCompressed.writeCompressedMatrixToHDFS(mb, n, 50);
                }
                catch(Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
+               verifySum(read(n), sum, 0.0001);
+       }
+
+       private void verifySum(JavaPairRDD<MatrixIndexes, MatrixBlock> m, 
double val, double tol) {
+               verifySum(m.collect(), val, tol);
        }
 
+       private void verifySum(List<Tuple2<MatrixIndexes, MatrixBlock>> c, 
double val, double tol) {
+               double sum = 0.0;
+               for(Tuple2<MatrixIndexes, MatrixBlock> b : c)
+                       sum += b._2().sum();
+               assertEquals(val, sum, tol);
+       }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java 
b/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java
index 90b23e49ab..11f28b6fd5 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java
@@ -41,18 +41,18 @@ public class IOTest {
                + IOTest.class.getSimpleName() + "/";
 
        public IOTest() {
-               synchronized(IOTestUtils.lock) {
+               synchronized(IOCompressionTestUtils.lock) {
                        new File(nameBeginning).mkdirs();
                }
        }
 
        @AfterClass
        public static void cleanup() {
-               IOTestUtils.deleteDirectory(new File(nameBeginning));
+               IOCompressionTestUtils.deleteDirectory(new File(nameBeginning));
        }
 
        public static String getName() {
-               return IOTestUtils.getName(nameBeginning);
+               return IOCompressionTestUtils.getName(nameBeginning);
        }
 
        @Test
@@ -101,8 +101,8 @@ public class IOTest {
                WriterCompressed.writeCompressedMatrixToHDFS(mb, filename);
                File f = new File(filename);
                assertTrue(f.isFile() || f.isDirectory());
-               MatrixBlock mbr = IOTestUtils.read(filename);
-               IOTestUtils.verifyEquivalence(mb, mbr);
+               MatrixBlock mbr = IOCompressionTestUtils.read(filename);
+               IOCompressionTestUtils.verifyEquivalence(mb, mbr);
        }
 
        protected static void write(MatrixBlock src, String path) {
@@ -145,7 +145,7 @@ public class IOTest {
                WriterCompressed.writeCompressedMatrixToHDFS(mb, filename, 
blen);
                File f = new File(filename);
                assertTrue(f.isFile() || f.isDirectory());
-               MatrixBlock mbr = IOTestUtils.read(filename);
-               IOTestUtils.verifyEquivalence(mb, mbr);
+               MatrixBlock mbr = IOCompressionTestUtils.read(filename);
+               IOCompressionTestUtils.verifyEquivalence(mb, mbr);
        }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/io/compressed/WriteCompressedTest.java
 
b/src/test/java/org/apache/sysds/test/functions/io/compressed/WriteCompressedTest.java
new file mode 100644
index 0000000000..ba9e6f430f
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/io/compressed/WriteCompressedTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.io.compressed;
+
+import java.io.IOException;
+
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.test.component.compress.io.IOCompressionTestUtils;
+import org.junit.Test;
+
+/**
+ * JUnit Test cases to evaluate the functionality of reading CSV files.
+ * 
+ * Test 1: write() w/ all properties. Test 2: read(format="csv") w/o mtd file. 
Test 3: read() w/ complete mtd file.
+ *
+ */
+
+public class WriteCompressedTest extends AutomatedTestBase {
+
+       private final static String TEST_NAME = "WriteCompressedTest";
+       private final static String TEST_DIR = "functions/io/compressed/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
WriteCompressedTest.class.getSimpleName() + "/";
+
+       private final static double eps = 1e-9;
+
+       @Override
+       public void setUp() {
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"Rout"}));
+       }
+
+       @Test
+       public void testCP() throws IOException {
+               runWriteTest(ExecMode.SINGLE_NODE);
+       }
+
+       @Test
+       public void testHP() throws IOException {
+               runWriteTest(ExecMode.HYBRID);
+       }
+
+       @Test
+       public void testSP() throws IOException {
+               runWriteTest(ExecMode.SPARK);
+       }
+
+       private void runWriteTest(ExecMode platform) throws IOException {
+               runWriteTest(platform, 100, 100, 0, 0, 0.0);
+       }
+
+       private void runWriteTest(ExecMode platform, int rows, int cols, int 
min, int max, double sparsity)
+               throws IOException {
+
+               ExecMode oldPlatform = rtplatform;
+               rtplatform = platform;
+
+               TestConfiguration config = getTestConfiguration(TEST_NAME);
+               loadTestConfiguration(config);
+
+               String HOME = SCRIPT_DIR + TEST_DIR;
+
+               fullDMLScriptName = HOME + TEST_NAME + ".dml";
+               programArgs = new String[] {"-explain", "-args", "" + rows, "" 
+ cols, "" + min, "" + max, "" + sparsity,
+                       output("out.cla"), output("sum.scalar")};
+
+               runTest(null);
+
+               double sumDML = TestUtils.readDMLScalar(output("sum.scalar"));
+               MatrixBlock mbr = 
IOCompressionTestUtils.read(output("out.cla"));
+               
+               TestUtils.compareScalars(sumDML, mbr.sum(), eps);
+
+               rtplatform = oldPlatform;
+       }
+}
diff --git a/src/test/scripts/functions/io/compressed/WriteCompressedTest.dml 
b/src/test/scripts/functions/io/compressed/WriteCompressedTest.dml
new file mode 100644
index 0000000000..6a51041016
--- /dev/null
+++ b/src/test/scripts/functions/io/compressed/WriteCompressedTest.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+m = rand(rows= $1, cols=$2, min=$3, max=$4, sparsity=$5)
+
+s = sum(m)
+write(m, $6, format="compressed")
+write(s, $7, format="csv")


Reply via email to