This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new aeefc07  [MINOR] Bug fixes in multi-threaded transformapply
aeefc07 is described below

commit aeefc077575532e433bd2384d56829b777cab596
Author: arnabp <[email protected]>
AuthorDate: Fri Feb 25 20:10:22 2022 +0100

    [MINOR] Bug fixes in multi-threaded transformapply
---
 .../cp/ParameterizedBuiltinCPInstruction.java       |  3 ++-
 .../sysds/runtime/matrix/data/MatrixBlock.java      | 21 +++++++++++++++++++--
 .../runtime/transform/encode/ColumnEncoder.java     |  1 +
 .../transform/encode/MultiColumnEncoder.java        | 18 +++++++++++++-----
 4 files changed, 35 insertions(+), 8 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
index be6c40a..02b1638 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.lops.Lop;
 import org.apache.sysds.parser.ParameterizedBuiltinFunctionExpression;
 import org.apache.sysds.parser.Statement;
@@ -303,7 +304,7 @@ public class ParameterizedBuiltinCPInstruction extends 
ComputationCPInstruction
                        // compute transformapply
                        MultiColumnEncoder encoder = EncoderFactory
                                .createEncoder(params.get("spec"), colNames, 
data.getNumColumns(), meta);
-                       MatrixBlock mbout = encoder.apply(data);
+                       MatrixBlock mbout = encoder.apply(data, 
OptimizerUtils.getTransformNumThreads());
 
                        // release locks
                        ec.setMatrixOutput(output.getName(), mbout);
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 2a8d74d..bc7f05f 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -503,7 +503,11 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
        public final long setNonZeros(long nnz) {
                return (nonZeros = nnz);
        }
-       
+
+       public final long setAllNonZeros() {
+               return (nonZeros = getLength());
+       }
+
        public final double getSparsity() {
                return OptimizerUtils.getSparsity(rlen, clen, nonZeros);
        }
@@ -661,6 +665,17 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                }
        }
 
+       public void denseSuperQuickSetValue(int r, int c, double v)
+       {
+               //early abort
+               if( denseBlock==null && v==0 )
+                       return;
+
+               denseBlock.set(r, c, v);
+               if( v==0 )
+                       nonZeros--;
+       }
+
        public double quickGetValueThreadSafe(int r, int c) {
                if(sparse) {
                        if(!(sparseBlock instanceof SparseBlockMCSR))
@@ -5464,12 +5479,13 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                        double v2 = that.quickGetValue(i, 0);
                        maxCol = ctable.execute(i+1, v2, w, maxCol, 
resultBlock);
                }
-               
+
                //update meta data (initially unknown number of columns)
                //note: nnz maintained in ctable (via quickset)
                if(updateClen) {
                        resultBlock.clen = maxCol;
                }
+
                return resultBlock;
        }
 
@@ -5613,6 +5629,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
        
        ////////
        // Data Generation Methods
+
        // (rand, sequence)
        
        /**
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
index df673e9..bb543d8 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
@@ -172,6 +172,7 @@ public abstract class ColumnEncoder implements Encoder, 
Comparable<ColumnEncoder
                        int lim = Math.min(i+B, rowEnd);
                        for (int ii=i; ii<lim; ii++)
                                out.quickSetValue(ii, outputCol, 
codes[ii-rowStart]);
+                               //out.denseSuperQuickSetValue(ii, outputCol, 
codes[ii-rowStart]);
                }
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index 00c7962..019a140 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -246,7 +246,7 @@ public class MultiColumnEncoder implements Encoder {
        public void build(CacheBlock in, int k) {
                if(hasLegacyEncoder() && !(in instanceof FrameBlock))
                        throw new DMLRuntimeException("LegacyEncoders do not 
support non FrameBlock Inputs");
-               if(_nPartitions == null) //happens if this method is directly 
called from the tests
+               if(_nPartitions == null) //happens if this method is directly 
called
                        _nPartitions = getNumRowPartitions(in, k);
                if(k > 1) {
                        buildMT(in, k);
@@ -294,6 +294,9 @@ public class MultiColumnEncoder implements Encoder {
        }
 
        public MatrixBlock apply(CacheBlock in, int k) {
+               // domain sizes are not updated if called from transformapply
+               for(ColumnEncoderComposite columnEncoder : _columnEncoders)
+                       columnEncoder.updateAllDCEncoders();
                int numCols = in.getNumColumns() + getNumExtraCols();
                long estNNz = (long) in.getNumColumns() * (long) 
in.getNumRows();
                boolean sparse = 
MatrixBlock.evalSparseFormatInMemory(in.getNumRows(), numCols, estNNz);
@@ -320,6 +323,8 @@ public class MultiColumnEncoder implements Encoder {
                        hasDC = 
columnEncoder.hasEncoder(ColumnEncoderDummycode.class);
                outputMatrixPreProcessing(out, in, hasDC);
                if(k > 1) {
+                       if(_nPartitions == null) //happens if this method is 
directly called
+                               _nPartitions = getNumRowPartitions(in, k);
                        applyMT(in, out, outputCol, k);
                }
                else {
@@ -403,11 +408,11 @@ public class MultiColumnEncoder implements Encoder {
                                nBuild++;
                int nApply = in.getNumColumns();
                // #BuildBlocks = (2 * #PhysicalCores)/#build
-               if (numBlocks[0] == 0 && nBuild < nThread)
+               if (numBlocks[0] == 0 && nBuild > 0 && nBuild < nThread)
                        numBlocks[0] = Math.round(((float)nThread)/nBuild);
                // #ApplyBlocks = (4 * #PhysicalCores)/#apply
-               if (numBlocks[1] == 0 && nApply < nThread*2)
-                       numBlocks[1] = Math.round(((float)nThread*2)/nBuild);
+               if (numBlocks[1] == 0 && nApply > 0 && nApply < nThread*2)
+                       numBlocks[1] = Math.round(((float)nThread*2)/nApply);
 
                // Reduce #blocks if #rows per partition is too small
                while (numBlocks[0] > 1 && nRow/numBlocks[0] < minNumRows)
@@ -469,8 +474,11 @@ public class MultiColumnEncoder implements Encoder {
                                output.setSparseBlock(csrblock);
                        }
                }
-               else //dense
+               else {
+                       // Allocate dense block and set nnz to total #entries
                        output.allocateBlock();
+                       //output.setAllNonZeros();
+               }
 
                if(DMLScript.STATISTICS) {
                        LOG.debug("Elapsed time for allocation: "+ ((double) 
System.nanoTime() - t0) / 1000000 + " ms");

Reply via email to