This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 12f283fda7 [SYSTEMDS-3293] Materialize partition counts in the encoder 
objects
12f283fda7 is described below

commit 12f283fda7e53f961322c87ea7e5f0ab279a7b13
Author: arnabp <[email protected]>
AuthorDate: Fri Apr 29 10:00:31 2022 +0200

    [SYSTEMDS-3293] Materialize partition counts in the encoder objects
    
    This patch refactors the current code to derive optimum number of
    build and apply blocks and push them inside the encoder objects.
    This change allows us to column-wise vary the partition counts.
---
 src/main/java/org/apache/sysds/conf/DMLConfig.java |  4 +-
 .../runtime/transform/encode/ColumnEncoder.java    | 23 ++++++-----
 .../transform/encode/ColumnEncoderComposite.java   | 17 ++++++---
 .../transform/encode/ColumnEncoderDummycode.java   |  2 +-
 .../transform/encode/ColumnEncoderFeatureHash.java |  2 +-
 .../transform/encode/ColumnEncoderPassThrough.java |  2 +-
 .../transform/encode/MultiColumnEncoder.java       | 44 +++++++++++-----------
 .../TransformFrameBuildMultithreadedTest.java      |  2 +-
 8 files changed, 52 insertions(+), 44 deletions(-)

diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java 
b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index 8863faf9bd..1b730f6a3c 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -143,8 +143,8 @@ public class DMLConfig
                _defaultVals.put(CP_PARALLEL_IO,         "true" );
                _defaultVals.put(PARALLEL_ENCODE,        "false" );
                _defaultVals.put(PARALLEL_ENCODE_STAGED, "false" );
-               _defaultVals.put(PARALLEL_ENCODE_APPLY_BLOCKS, "1");
-               _defaultVals.put(PARALLEL_ENCODE_BUILD_BLOCKS, "1");
+               _defaultVals.put(PARALLEL_ENCODE_APPLY_BLOCKS, "-1");
+               _defaultVals.put(PARALLEL_ENCODE_BUILD_BLOCKS, "-1");
                _defaultVals.put(PARALLEL_ENCODE_NUM_THREADS, "-1");
                _defaultVals.put(COMPRESSED_LINALG,      
Compression.CompressConfig.FALSE.name() );
                _defaultVals.put(COMPRESSED_LOSSY,       "false" );
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
index 9c7832c2bf..4e969896c3 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
@@ -61,6 +61,8 @@ public abstract class ColumnEncoder implements Encoder, 
Comparable<ColumnEncoder
        protected int _colID;
        protected ArrayList<Integer> _sparseRowsWZeros = null;
        protected long _estMetaSize = 0;
+       protected int _nBuildPartitions = 0;
+       protected int _nApplyPartitions = 0;
 
        protected enum TransformType{
                BIN, RECODE, DUMMYCODE, FEATURE_HASH, PASS_THROUGH, N_A
@@ -300,11 +302,11 @@ public abstract class ColumnEncoder implements Encoder, 
Comparable<ColumnEncoder
         * complete if all previous tasks are done. This is so that we can use 
the last task as a dependency for the whole
         * build, reducing unnecessary dependencies.
         */
-       public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int 
nBuildPartition) {
+       public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
                List<Callable<Object>> tasks = new ArrayList<>();
                List<List<? extends Callable<?>>> dep = null;
                int nRows = in.getNumRows();
-               int[] blockSizes = getBlockSizes(nRows, nBuildPartition);
+               int[] blockSizes = getBlockSizes(nRows, _nBuildPartitions);
                if(blockSizes.length == 1) {
                        tasks.add(getBuildTask(in));
                }
@@ -335,17 +337,17 @@ public abstract class ColumnEncoder implements Encoder, 
Comparable<ColumnEncoder
        }
 
 
-       public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock 
out, int nApplyPartitions, int outputCol) {
+       public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock 
out, int outputCol) {
                List<Callable<Object>> tasks = new ArrayList<>();
                List<List<? extends Callable<?>>> dep = null;
-               int[] blockSizes = getBlockSizes(in.getNumRows(), 
nApplyPartitions);
+               int[] blockSizes = getBlockSizes(in.getNumRows(), 
_nApplyPartitions);
                for(int startRow = 0, i = 0; i < blockSizes.length; 
startRow+=blockSizes[i], i++){
                        if(out.isInSparseFormat())
                                tasks.add(getSparseTask(in, out, outputCol, 
startRow, blockSizes[i]));
                        else
                                tasks.add(getDenseTask(in, out, outputCol, 
startRow, blockSizes[i]));
                }
-               if(tasks.size() > 1){
+               if(tasks.size() > 1) {
                        dep = new ArrayList<>(Collections.nCopies(tasks.size(), 
null));
                        tasks.add(() -> null);  // Empty task as barrier
                        dep.add(tasks.subList(0, tasks.size()-1));
@@ -380,15 +382,12 @@ public abstract class ColumnEncoder implements Encoder, 
Comparable<ColumnEncoder
                }
        }
 
-       protected int getNumApplyRowPartitions(){
-               return ConfigurationManager.getParallelApplyBlocks();
+       protected void setBuildRowBlocksPerColumn(int nPart) {
+               _nBuildPartitions = nPart;
        }
 
-       protected int getNumBuildRowPartitions(){
-               if (BUILD_ROW_BLOCKS_PER_COLUMN == -1)
-                       return ConfigurationManager.getParallelBuildBlocks();
-               else
-                       return BUILD_ROW_BLOCKS_PER_COLUMN;
+       protected void setApplyRowBlocksPerColumn(int nPart) {
+               _nApplyPartitions = nPart;
        }
 
        public enum EncoderType {
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
index 59dd3157c0..a22cab19ab 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
@@ -106,17 +106,17 @@ public class ColumnEncoderComposite extends ColumnEncoder 
{
        }
 
        @Override
-       public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock 
out, int nParition, int outputCol) {
+       public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock 
out, int outputCol) {
                List<DependencyTask<?>> tasks = new ArrayList<>();
                List<Integer> sizes = new ArrayList<>();
                for(int i = 0; i < _columnEncoders.size(); i++) {
                        List<DependencyTask<?>> t;
                        if(i == 0) {
                                // 1. encoder writes data into MatrixBlock 
Column all others use this column for further encoding
-                               t = _columnEncoders.get(i).getApplyTasks(in, 
out, nParition, outputCol);
+                               t = _columnEncoders.get(i).getApplyTasks(in, 
out, outputCol);
                        }
                        else {
-                               t = _columnEncoders.get(i).getApplyTasks(out, 
out, nParition, outputCol);
+                               t = _columnEncoders.get(i).getApplyTasks(out, 
out, outputCol);
                        }
                        if(t == null)
                                continue;
@@ -143,11 +143,11 @@ public class ColumnEncoderComposite extends ColumnEncoder 
{
        }
 
        @Override
-       public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int 
nPartition) {
+       public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
                List<DependencyTask<?>> tasks = new ArrayList<>();
                Map<Integer[], Integer[]> depMap = null;
                for(ColumnEncoder columnEncoder : _columnEncoders) {
-                       List<DependencyTask<?>> t = 
columnEncoder.getBuildTasks(in, nPartition);
+                       List<DependencyTask<?>> t = 
columnEncoder.getBuildTasks(in);
                        if(t == null)
                                continue;
                        // Linear execution between encoders so they can't be 
built in parallel
@@ -368,6 +368,13 @@ public class ColumnEncoderComposite extends ColumnEncoder {
                setEstMetaSize(totEstSize);
        }
 
+       public void setNumPartitions(int nBuild, int nApply) {
+                       _columnEncoders.forEach(e -> {
+                               e.setBuildRowBlocksPerColumn(nBuild);
+                               e.setApplyRowBlocksPerColumn(nApply);
+                       });
+       }
+
        @Override
        public void shiftCol(int columnOffset) {
                super.shiftCol(columnOffset);
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
index 3366329a40..e0efe53e92 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
@@ -65,7 +65,7 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
        }
 
        @Override
-       public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int 
nParition) {
+       public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
                return null;
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
index aa362fef74..9445474446 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
@@ -93,7 +93,7 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder {
        }
 
        @Override
-       public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int 
nParition) {
+       public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
                return null;
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
index 04df3d5b9f..0b95734e2f 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
@@ -51,7 +51,7 @@ public class ColumnEncoderPassThrough extends ColumnEncoder {
        }
 
        @Override
-       public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int 
nParition) {
+       public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
                return null;
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index fe49097296..b34a152fc7 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -76,7 +76,7 @@ public class MultiColumnEncoder implements Encoder {
        private EncoderOmit _legacyOmit = null;
        private int _colOffset = 0; // offset for federated Workers who are 
using subrange encoders
        private FrameBlock _meta = null;
-       private int[] _nPartitions = null;
+       private boolean _partitionDone = false;
 
        public MultiColumnEncoder(List<ColumnEncoderComposite> columnEncoders) {
                _columnEncoders = columnEncoders;
@@ -92,7 +92,7 @@ public class MultiColumnEncoder implements Encoder {
 
        public MatrixBlock encode(CacheBlock in, int k) {
                MatrixBlock out;
-               _nPartitions = getNumRowPartitions(in, k);
+               deriveNumRowPartitions(in, k);
                try {
                        if(k > 1 && !MULTI_THREADED_STAGES && 
!hasLegacyEncoder()) {
                                out = new MatrixBlock();
@@ -159,7 +159,7 @@ public class MultiColumnEncoder implements Encoder {
 
                for(ColumnEncoderComposite e : _columnEncoders) {
                        // Create the build tasks
-                       List<DependencyTask<?>> buildTasks = 
e.getBuildTasks(in, _nPartitions[0]);
+                       List<DependencyTask<?>> buildTasks = 
e.getBuildTasks(in);
                        tasks.addAll(buildTasks);
                        if(buildTasks.size() > 0) {
                                // Check if any Build independent UpdateDC task 
(Bin+DC, FH+DC)
@@ -201,7 +201,7 @@ public class MultiColumnEncoder implements Encoder {
                        // Apply Task depends on InitOutputMatrixTask (output 
allocation)
                        depMap.put(new Integer[] {tasks.size(), tasks.size() + 
1},         //ApplyTask
                                        new Integer[] {0, 1});                  
                   //Allocation task (1st task)
-                       ApplyTasksWrapperTask applyTaskWrapper = new 
ApplyTasksWrapperTask(e, in, out, _nPartitions[1], pool);
+                       ApplyTasksWrapperTask applyTaskWrapper = new 
ApplyTasksWrapperTask(e, in, out, pool);
 
                        if(e.hasEncoder(ColumnEncoderDummycode.class)) {
                                // Allocation depends on build if DC is in the 
list.
@@ -248,8 +248,8 @@ public class MultiColumnEncoder implements Encoder {
        public void build(CacheBlock in, int k) {
                if(hasLegacyEncoder() && !(in instanceof FrameBlock))
                        throw new DMLRuntimeException("LegacyEncoders do not 
support non FrameBlock Inputs");
-               if(_nPartitions == null) //happens if this method is directly 
called
-                       _nPartitions = getNumRowPartitions(in, k);
+               if(!_partitionDone) //happens if this method is directly called
+                       deriveNumRowPartitions(in, k);
                if(k > 1) {
                        buildMT(in, k);
                }
@@ -266,7 +266,7 @@ public class MultiColumnEncoder implements Encoder {
        private List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
                List<DependencyTask<?>> tasks = new ArrayList<>();
                for(ColumnEncoderComposite columnEncoder : _columnEncoders) {
-                       tasks.addAll(columnEncoder.getBuildTasks(in, 
_nPartitions[0]));
+                       tasks.addAll(columnEncoder.getBuildTasks(in));
                }
                return tasks;
        }
@@ -325,8 +325,8 @@ public class MultiColumnEncoder implements Encoder {
                        hasDC = 
columnEncoder.hasEncoder(ColumnEncoderDummycode.class);
                outputMatrixPreProcessing(out, in, hasDC);
                if(k > 1) {
-                       if(_nPartitions == null) //happens if this method is 
directly called
-                               _nPartitions = getNumRowPartitions(in, k);
+                       if(!_partitionDone) //happens if this method is 
directly called
+                               deriveNumRowPartitions(in, k);
                        applyMT(in, out, outputCol, k);
                }
                else {
@@ -348,11 +348,11 @@ public class MultiColumnEncoder implements Encoder {
                return out;
        }
 
-       private List<DependencyTask<?>> getApplyTasks(CacheBlock in, 
MatrixBlock out, int nPartition, int outputCol) {
+       private List<DependencyTask<?>> getApplyTasks(CacheBlock in, 
MatrixBlock out, int outputCol) {
                List<DependencyTask<?>> tasks = new ArrayList<>();
                int offset = outputCol;
                for(ColumnEncoderComposite e : _columnEncoders) {
-                       tasks.addAll(e.getApplyTasks(in, out, nPartition, 
e._colID - 1 + offset));
+                       tasks.addAll(e.getApplyTasks(in, out, e._colID - 1 + 
offset));
                        if(e.hasEncoder(ColumnEncoderDummycode.class))
                                offset += 
e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
                }
@@ -365,12 +365,12 @@ public class MultiColumnEncoder implements Encoder {
                        if(APPLY_ENCODER_SEPARATE_STAGES){
                                int offset = outputCol;
                                for (ColumnEncoderComposite e : 
_columnEncoders) {
-                                       
pool.submitAllAndWait(e.getApplyTasks(in, out, _nPartitions[1], e._colID - 1 + 
offset));
+                                       
pool.submitAllAndWait(e.getApplyTasks(in, out, e._colID - 1 + offset));
                                        if 
(e.hasEncoder(ColumnEncoderDummycode.class))
                                                offset += 
e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
                                }
                        }else{
-                               pool.submitAllAndWait(getApplyTasks(in, out, 
_nPartitions[1], outputCol));
+                               pool.submitAllAndWait(getApplyTasks(in, out, 
outputCol));
                        }
                }
                catch(ExecutionException | InterruptedException e) {
@@ -380,12 +380,14 @@ public class MultiColumnEncoder implements Encoder {
                pool.shutdown();
        }
 
-       private int[] getNumRowPartitions(CacheBlock in, int k) {
+       private void deriveNumRowPartitions(CacheBlock in, int k) {
                int[] numBlocks = new int[2];
                if (k == 1) { //single-threaded
                        numBlocks[0] = 1;
                        numBlocks[1] = 1;
-                       return numBlocks;
+                       _columnEncoders.forEach(e -> e.setNumPartitions(1, 1));
+                       _partitionDone = true;
+                       return;
                }
                // Read from global flags. These are set by the unit tests
                if (ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN > 0)
@@ -443,7 +445,9 @@ public class MultiColumnEncoder implements Encoder {
                        if (numBlocks[i] == 0)
                                numBlocks[i] = 1; //default 1
 
-               return numBlocks;
+               _partitionDone = true;
+               // Materialize the partition counts in the encoders
+               _columnEncoders.forEach(e -> e.setNumPartitions(numBlocks[0], 
numBlocks[1]));
        }
 
        private void estimateRCMapSize(CacheBlock in, 
List<ColumnEncoderComposite> rcList) {
@@ -454,7 +458,7 @@ public class MultiColumnEncoder implements Encoder {
                int seed = (int) System.nanoTime();
                int[] sampleInds = 
CompressedSizeEstimatorSample.getSortedSample(in.getNumRows(), sampleSize, 
seed, 1);
 
-               // Concurrent (col-wise) recode map size estimation
+               // Concurrent (column-wise) recode map size estimation
                ExecutorService myPool = CommonThreadPool.get(k);
                try {
                        myPool.submit(() -> {
@@ -1046,22 +1050,20 @@ public class MultiColumnEncoder implements Encoder {
                private final ColumnEncoder _encoder;
                private final MatrixBlock _out;
                private final CacheBlock _in;
-               private final int _nApplyPartition;
                private int _offset = -1; // offset dude to dummycoding in
                                                                        // 
previous columns needs to be updated by external task!
 
                private ApplyTasksWrapperTask(ColumnEncoder encoder, CacheBlock 
in, 
-                               MatrixBlock out, int nPart, 
DependencyThreadPool pool) {
+                               MatrixBlock out, DependencyThreadPool pool) {
                        super(pool);
                        _encoder = encoder;
                        _out = out;
                        _in = in;
-                       _nApplyPartition = nPart;
                }
 
                @Override
                public List<DependencyTask<?>> getWrappedTasks() {
-                       return _encoder.getApplyTasks(_in, _out, 
_nApplyPartition, _encoder._colID - 1 + _offset);
+                       return _encoder.getApplyTasks(_in, _out, 
_encoder._colID - 1 + _offset);
                }
 
                @Override
diff --git 
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
 
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
index 27d7c785df..c18f184f2b 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
@@ -188,7 +188,7 @@ public class TransformFrameBuildMultithreadedTest extends 
AutomatedTestBase {
                                .readFrameFromHDFS(DATASET, -1L, -1L);
                        StringBuilder specSb = new StringBuilder();
                        Files.readAllLines(Paths.get(SPEC)).forEach(s -> 
specSb.append(s).append("\n"));
-                       ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN = 
Math.max(blockSize, 1);
+                       ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN = 
Math.max(blockSize, -1);
                        MultiColumnEncoder encoderS = 
EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
                                input.getNumColumns(), null);
                        MultiColumnEncoder encoderM = 
EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),

Reply via email to