This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new f8d104a  [SYSTEMDS-3104] Fix parallel csv matrix reader for large 
dense blocks
f8d104a is described below

commit f8d104ae6be2965e3c3d21327dd0c536f22f9900
Author: Matthias Boehm <[email protected]>
AuthorDate: Thu Aug 26 16:30:45 2021 +0200

    [SYSTEMDS-3104] Fix parallel csv matrix reader for large dense blocks
    
    A previous commit a few month ago tried to optimize the performance of
    the parallel csv read, but corrupted the existing support of reading
    large dense blocks >16GB. This patch fixes the issue in a minimally
    invasive manner.
---
 .../apache/sysds/runtime/data/DenseBlockLDRB.java  |  2 +-
 .../sysds/runtime/io/ReaderTextCSVParallel.java    | 87 +++++++++++-----------
 2 files changed, 45 insertions(+), 44 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java 
b/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java
index e6c8a7b..2da57e0 100644
--- a/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java
+++ b/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java
@@ -72,7 +72,7 @@ public abstract class DenseBlockLDRB extends DenseBlock
                        int lastBlockSize = (newBlockSize == rlen ? 
newBlockSize : rlen % newBlockSize) * odims[0];
                        allocateBlocks(numBlocks);
                        IntStream.range(0, numBlocks)
-                                       .forEach((i) -> {
+                                       .forEach(i -> {
                                                int length = (i == numBlocks - 
1 ? lastBlockSize : newBlockSize *  _odims[0]);
                                                allocateBlock(i, length);
                                                if (v != 0)
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
index 6d879d1..5e8c5e1 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.DenseBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.SparseRow;
 import org.apache.sysds.runtime.io.IOUtilFunctions.CountRowsTask;
@@ -93,7 +94,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
 
                // allocate output matrix block
                // First Read Pass (count rows/cols, determine offsets, 
allocate matrix block)
-               MatrixBlock ret = 
computeCSVSizeAndCreateOutputMatrixBlock(splits, path, rlen, clen, estnnz);
+               MatrixBlock ret = 
computeCSVSizeAndCreateOutputMatrixBlock(splits, path, rlen, clen, blen, 
estnnz);
 
                // Second Read Pass (read, parse strings, append to matrix 
block)
                readCSVMatrixFromHDFS(splits, path, ret);
@@ -156,8 +157,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
                }
        }
 
-       private MatrixBlock 
computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits, Path path, long 
rlen, long clen,
-               long estnnz) throws IOException, DMLRuntimeException {
+       private MatrixBlock 
computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits,
+               Path path, long rlen, long clen, int blen, long estnnz) throws 
IOException, DMLRuntimeException {
                _rLen = 0;
                _cLen = 0;
 
@@ -225,7 +226,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
                // allocate target matrix block based on given size;
                // need to allocate sparse as well since lock-free insert into 
target
                long estnnz2 = (estnnz < 0) ? (long) _rLen * _cLen : estnnz;
-               return createOutputMatrixBlock(_rLen, _cLen, _rLen, estnnz2, 
true, true);
+               return createOutputMatrixBlock(_rLen, _cLen, blen, estnnz2, 
true, true);
        }
 
        private static class SplitOffsetInfos {
@@ -262,8 +263,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
                protected final boolean _isFirstSplit;
                protected final int _splitCount;
 
-               protected int row = 0;
-               protected int col = 0;
+               protected int _row = 0;
+               protected int _col = 0;
 
                public CSVReadTask(InputSplit split, TextInputFormat informat, 
MatrixBlock dest, int splitCount) {
                        _split = split;
@@ -286,7 +287,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
                                        reader.next(key, value);
                                }
 
-                               row = _offsets.getOffsetPerSplit(_splitCount);
+                               _row = _offsets.getOffsetPerSplit(_splitCount);
 
                                long nnz = 0;
                                try {
@@ -300,8 +301,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
                        }
                        catch(Exception ex) {
                                // post-mortem error handling and bounds 
checking
-                               if(row < 0 || row + 1 > _rLen || col < 0 || col 
+ 1 > _cLen) {
-                                       String errMsg = "CSV cell [" + (row + 
1) + "," + (col + 1) + "] "
+                               if(_row < 0 || _row + 1 > _rLen || _col < 0 || 
_col + 1 > _cLen) {
+                                       String errMsg = "CSV cell [" + (_row + 
1) + "," + (_col + 1) + "] "
                                                + "out of overall matrix range 
[1:" + _rLen + ",1:" + _cLen + "]. " + ex.getMessage();
                                        throw new IOException(errMsg, ex);
                                }
@@ -317,8 +318,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
                        throws IOException;
 
                protected void verifyRows(Text value) throws IOException {
-                       if(row != (_offsets.getOffsetPerSplit(_splitCount) + 
_offsets.getLenghtPerSplit(_splitCount))) {
-                               throw new IOException("Incorrect number of rows 
(" + row + ") found in delimited file ("
+                       if(_row != (_offsets.getOffsetPerSplit(_splitCount) + 
_offsets.getLenghtPerSplit(_splitCount))) {
+                               throw new IOException("Incorrect number of rows 
(" + _row + ") found in delimited file ("
                                        + 
(_offsets.getOffsetPerSplit(_splitCount) + 
_offsets.getLenghtPerSplit(_splitCount)) + "): "
                                        + value);
                        }
@@ -332,17 +333,18 @@ public class ReaderTextCSVParallel extends MatrixReader {
                }
 
                protected long parse(RecordReader<LongWritable, Text> reader, 
LongWritable key, Text value) throws IOException {
-                       double[] a = _dest.getDenseBlockValues();
+                       DenseBlock a = _dest.getDenseBlock();
                        double cellValue = 0;
                        long nnz = 0;
                        boolean noFillEmpty = false;
-                       int index = row * (int) _cLen;
 
                        while(reader.next(key, value)) { // foreach line
                                final String cellStr = value.toString().trim();
                                final String[] parts = 
IOUtilFunctions.split(cellStr, _props.getDelim());
-                               for(String part : parts) { // foreach cell
-                                       part = part.trim();
+                               double[] avals = a.values(_row);
+                               int apos = a.pos(_row);
+                               for(int j = 0; j < _cLen; j++) { // foreach cell
+                                       String part = parts[j].trim();
                                        if(part.isEmpty()) {
                                                noFillEmpty |= !_props.isFill();
                                                cellValue = 
_props.getFillValue();
@@ -351,15 +353,14 @@ public class ReaderTextCSVParallel extends MatrixReader {
                                                cellValue = 
Double.parseDouble(part);
                                        }
                                        if(cellValue != 0) {
-                                               a[index] = cellValue;
+                                               avals[apos+j] = cellValue;
                                                nnz++;
                                        }
-                                       index++;
                                }
                                // sanity checks (number of columns, fill 
values)
                                
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), 
noFillEmpty);
                                
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, 
parts, _cLen);
-                               row++;
+                               _row++;
                        }
 
                        return nnz;
@@ -374,16 +375,17 @@ public class ReaderTextCSVParallel extends MatrixReader {
                }
 
                protected long parse(RecordReader<LongWritable, Text> reader, 
LongWritable key, Text value) throws IOException {
-                       double[] a = _dest.getDenseBlockValues();
+                       DenseBlock a = _dest.getDenseBlock();
                        double cellValue = 0;
                        boolean noFillEmpty = false;
-                       int index = row * (int) _cLen;
                        long nnz = 0;
                        while(reader.next(key, value)) { // foreach line
                                String cellStr = value.toString().trim();
                                String[] parts = IOUtilFunctions.split(cellStr, 
_props.getDelim());
-                               for(String part : parts) { // foreach cell
-                                       part = part.trim();
+                               double[] avals = a.values(_row);
+                               int apos = a.pos(_row);
+                               for(int j = 0; j < _cLen; j++) { // foreach cell
+                                       String part = parts[j].trim();
                                        if(part.isEmpty()) {
                                                noFillEmpty |= !_props.isFill();
                                                cellValue = 
_props.getFillValue();
@@ -392,15 +394,14 @@ public class ReaderTextCSVParallel extends MatrixReader {
                                                cellValue = 
UtilFunctions.parseToDouble(part, _props.getNAStrings());
 
                                        if(cellValue != 0) {
-                                               a[index] = cellValue;
+                                               avals[apos+j] = cellValue;
                                                nnz++;
                                        }
-                                       index++;
                                }
                                // sanity checks (number of columns, fill 
values)
                                
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), 
noFillEmpty);
                                
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, 
parts, _cLen);
-                               row++;
+                               _row++;
                        }
                        return nnz;
                }
@@ -421,9 +422,9 @@ public class ReaderTextCSVParallel extends MatrixReader {
 
                                final String cellStr = value.toString().trim();
                                final String[] parts = 
IOUtilFunctions.split(cellStr, _props.getDelim());
-                               col = 0;
-                               sb.allocate(row);
-                               SparseRow r = sb.get(row);
+                               _col = 0;
+                               sb.allocate(_row);
+                               SparseRow r = sb.get(_row);
 
                                for(String part : parts) {
                                        part = part.trim();
@@ -436,17 +437,17 @@ public class ReaderTextCSVParallel extends MatrixReader {
                                        }
 
                                        if(cellValue != 0) {
-                                               r.append(col, cellValue);
+                                               r.append(_col, cellValue);
                                                nnz++;
                                        }
-                                       col++;
+                                       _col++;
                                }
 
                                // sanity checks (number of columns, fill 
values)
                                
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), 
noFillEmpty);
                                
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, 
parts, _cLen);
 
-                               row++;
+                               _row++;
                        }
                        return nnz;
                }
@@ -463,11 +464,11 @@ public class ReaderTextCSVParallel extends MatrixReader {
                        double cellValue = 0;
                        boolean noFillEmpty = false;
                        while(reader.next(key, value)) {
-                               col = 0;
+                               _col = 0;
                                final String cellStr = value.toString().trim();
                                final String[] parts = 
IOUtilFunctions.split(cellStr, _props.getDelim());
-                               sb.allocate(row);
-                               SparseRow r = sb.get(row);
+                               sb.allocate(_row);
+                               SparseRow r = sb.get(_row);
                                for(String part : parts) {
                                        part = part.trim();
                                        if(part.isEmpty()) {
@@ -479,17 +480,17 @@ public class ReaderTextCSVParallel extends MatrixReader {
                                        }
 
                                        if(cellValue != 0) {
-                                               r.append(col, cellValue);
+                                               r.append(_col, cellValue);
                                                nnz++;
                                        }
-                                       col++;
+                                       _col++;
                                }
 
                                // sanity checks (number of columns, fill 
values)
                                
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), 
noFillEmpty);
                                
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, 
parts, _cLen);
 
-                               row++;
+                               _row++;
                        }
                        return nnz;
                }
@@ -506,25 +507,25 @@ public class ReaderTextCSVParallel extends MatrixReader {
                        long nnz = 0;
                        double cellValue = 0;
                        while(reader.next(key, value)) {
-                               col = 0;
+                               _col = 0;
                                final String cellStr = value.toString().trim();
                                final String[] parts = 
IOUtilFunctions.split(cellStr, _props.getDelim());
-                               sb.allocate(row);
-                               SparseRow r = sb.get(row);
+                               sb.allocate(_row);
+                               SparseRow r = sb.get(_row);
                                for(String part : parts) {
                                        if(!part.isEmpty()) {
                                                cellValue = 
Double.parseDouble(part);
                                                if(cellValue != 0) {
-                                                       r.append(col, 
cellValue);
+                                                       r.append(_col, 
cellValue);
                                                        nnz++;
                                                }
                                        }
-                                       col++;
+                                       _col++;
                                }
 
                                
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, 
parts, _cLen);
 
-                               row++;
+                               _row++;
                        }
                        return nnz;
                }

Reply via email to