This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new b9eb4d8275 [SYSTEMDS-3688] Improved MatrixMarket parallel reader 
performance
b9eb4d8275 is described below

commit b9eb4d8275ada2d3a9daa91481926ecc1d11ec8d
Author: Matthias Boehm <[email protected]>
AuthorDate: Thu Mar 28 15:42:52 2024 +0100

    [SYSTEMDS-3688] Improved MatrixMarket parallel reader performance
    
    This patch improves the performance of the existing parallel reader
    for the matrix-market formats. The changes include
    * Parallel sparse row pre-allocation (according to counted row-nnz)
    * Avoid buffer-value allocation for matrix-market pattern type (0/1)
    * Tailored sparse block append from buffers
    
    On a machine with 24 pcores / 48 vcores, this patch improved the
    read performance for road-network graph data as follows:
    
    * germany_osm.mtx (first read):   3.84s ->  2.94s
    * germany_osm.mtx (avg 10xread):  2.97s ->  2.25s
    * europe_osm.mtx (first read):   13.67s -> 12.09s
    * europe_osm.mtx (avg 10xread):  10.36s ->  8.71s
---
 .../apache/sysds/runtime/data/SparseBlockMCSR.java |  2 +-
 .../sysds/runtime/io/ReaderTextCellParallel.java   | 40 ++++++++++++++++------
 2 files changed, 30 insertions(+), 12 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/data/SparseBlockMCSR.java 
b/src/main/java/org/apache/sysds/runtime/data/SparseBlockMCSR.java
index 70e36bf4cd..025da10394 100644
--- a/src/main/java/org/apache/sysds/runtime/data/SparseBlockMCSR.java
+++ b/src/main/java/org/apache/sysds/runtime/data/SparseBlockMCSR.java
@@ -381,7 +381,7 @@ public class SparseBlockMCSR extends SparseBlock
                if(v == 0)
                        return;
                else if(_rows[r] == null)
-                       _rows[r] = new SparseRowScalar().append(c, v);
+                       _rows[r] = new SparseRowScalar(c, v);
                else 
                        _rows[r] = _rows[r].append(c, v); 
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java
index 23b6b738ca..9b80252040 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -91,6 +92,7 @@ public class ReaderTextCellParallel extends ReaderTextCell
                        long len = HDFSTool.getFilesizeOnHDFS(path);
                        par = ( len < MIN_FILESIZE_MM ) ? 1: par; 
                }
+               final int par2 = par;
                
                try 
                {
@@ -107,10 +109,13 @@ public class ReaderTextCellParallel extends ReaderTextCell
                                List<Future<Void>> rt1 = pool.invokeAll(tasks);
                                for( Future<Void> task : rt1 )
                                        task.get();
+                               //allocate sparse block and rows
                                SparseBlock sblock = 
dest.allocateBlock().getSparseBlock();
-                               for( int i=0; i<rlen; i++ )
-                                       if( rNnz[i] > 0 )
-                                               sblock.allocate(i, 
UtilFunctions.roundToNext(rNnz[i], 4));
+                               List<Future<?>> rt1b = IntStream.range(0, par)
+                                       .mapToObj(i -> pool.submit(() -> 
preallocateSparseRows(sblock, i, par2, rlen, rNnz)))
+                                       .collect(Collectors.toList());
+                               for( Future<?> task : rt1b )
+                                       task.get();
                        }
                        
                        //create and execute read tasks for all splits
@@ -135,6 +140,17 @@ public class ReaderTextCellParallel extends ReaderTextCell
                        throw new IOException("Threadpool issue, while parallel 
read.", e);
                }
        }
+       
+       private static void preallocateSparseRows(SparseBlock sblock, int i, 
int par, long rlen, int[] rNnz) {
+               int rl = (int) (i*rlen/par);
+               int ru = (int) Math.min((i+1)*rlen/par, rlen);
+               for( int j=rl; j<ru; j++ ) {
+                       if( rNnz[j] > 1 ) //sparse row
+                               sblock.allocate(j, 
UtilFunctions.roundToNext(rNnz[j], 4));
+                       else if( rNnz[j] == 1 ) //sparse scalar
+                               sblock.allocate(j, 1);
+               }
+       }
 
        public static class ReadTask implements Callable<Long> 
        {
@@ -194,7 +210,8 @@ public class ReaderTextCellParallel extends ReaderTextCell
                                }
 
                                if( _sparse ) { //SPARSE<-value
-                                       CellBuffer buff = new CellBuffer();
+                                       CellBuffer buff = new 
CellBuffer(!(_mmProps!=null && _mmProps.isPatternField()));
+                                       SparseBlock sblock = 
_dest.getSparseBlock();
                                        while( reader.next(key, value) ) {
                                                cell = 
parseCell(value.toString(), st, cell, _mmProps);
                                                buff.addCell(cell.getI(), 
cell.getJ(), cell.getV());
@@ -203,13 +220,13 @@ public class ReaderTextCellParallel extends ReaderTextCell
                                                if( 
buff.size()>=CellBuffer.CAPACITY ) 
                                                        synchronized( _dest ){ 
//sparse requires lock
                                                                lnnz += 
buff.size();
-                                                               
buff.flushCellBufferToMatrixBlock(_dest);
+                                                               
buff.flushCellBufferToSparseBlock(sblock);
                                                        }
                                        }
                                        //final buffer flush 
                                        synchronized( _dest ){ //sparse 
requires lock
                                                lnnz += buff.size();
-                                               
buff.flushCellBufferToMatrixBlock(_dest);
+                                               
buff.flushCellBufferToSparseBlock(sblock);
                                        }
                                } 
                                else { //DENSE<-value
@@ -311,10 +328,10 @@ public class ReaderTextCellParallel extends ReaderTextCell
                private double[] _vals;
                private int _pos;
                
-               public CellBuffer( ) {
+               public CellBuffer( boolean values ) {
                        _rlen = new int[CAPACITY];
                        _clen = new int[CAPACITY];
-                       _vals = new double[CAPACITY];
+                       _vals = values ? new double[CAPACITY] : null;
                        _pos = -1;
                }
                
@@ -323,12 +340,13 @@ public class ReaderTextCellParallel extends ReaderTextCell
                        _pos++;
                        _rlen[_pos] = rlen;
                        _clen[_pos] = clen;
-                       _vals[_pos] = val;
+                       if(_vals != null)
+                               _vals[_pos] = val;
                }
                
-               public void flushCellBufferToMatrixBlock( MatrixBlock dest ) {
+               public void flushCellBufferToSparseBlock( SparseBlock dest ) {
                        for( int i=0; i<=_pos; i++ )
-                               dest.appendValue(_rlen[i], _clen[i], _vals[i]);
+                               dest.append(_rlen[i], _clen[i], _vals!=null ? 
_vals[i] : 1);
                        reset();
                }
                

Reply via email to