This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new cdff113  [SYSTEMDS-2972] Transformencode sparse improvements
cdff113 is described below

commit cdff113648371b2dace5905dc0eb81ebf486f094
Author: Lukas Erlbacher <[email protected]>
AuthorDate: Wed Sep 8 19:58:50 2021 +0200

    [SYSTEMDS-2972] Transformencode sparse improvements
    
    This PR introduces the sparse implementations for the transform encoders.
    Furthermore, this adds support for row partitioning, statistics, and
    debug logging. Now we can enable multithreaded transorm via a flag
    in the config file, "parallel.encode".
    
    Closes #1388, closes #1383
---
 .../apache/sysds/conf/ConfigurationManager.java    |   4 +
 src/main/java/org/apache/sysds/conf/DMLConfig.java |   4 +-
 .../java/org/apache/sysds/hops/OptimizerUtils.java |  18 +++
 .../runtime/compress/CompressedMatrixBlock.java    |   5 -
 ...ltiReturnParameterizedBuiltinCPInstruction.java |   4 +-
 .../sysds/runtime/matrix/data/MatrixBlock.java     |  35 +----
 .../runtime/transform/encode/ColumnEncoder.java    | 141 +++++++++++++-----
 .../runtime/transform/encode/ColumnEncoderBin.java |  90 ++++++++++--
 .../transform/encode/ColumnEncoderComposite.java   |  26 ++--
 .../transform/encode/ColumnEncoderDummycode.java   | 115 +++++++--------
 .../transform/encode/ColumnEncoderFeatureHash.java |  64 +++++++--
 .../transform/encode/ColumnEncoderPassThrough.java |  90 ++++++++++--
 .../transform/encode/ColumnEncoderRecode.java      |  81 +++++++++--
 .../runtime/transform/encode/EncoderFactory.java   |  26 ++--
 .../runtime/transform/encode/EncoderMVImpute.java  |  36 +++--
 .../runtime/transform/encode/EncoderOmit.java      |  22 +--
 .../transform/encode/MultiColumnEncoder.java       |  91 ++++++++----
 .../apache/sysds/runtime/util/DependencyTask.java  |  24 +++-
 .../sysds/runtime/util/DependencyThreadPool.java   |  12 +-
 .../apache/sysds/runtime/util/UtilFunctions.java   |   9 ++
 .../java/org/apache/sysds/utils/Statistics.java    | 159 +++++++++++++++++++--
 .../mt/TransformFrameBuildMultithreadedTest.java   |   2 +
 .../mt/TransformFrameEncodeMultithreadedTest.java  |   4 +-
 .../datasets/homes3/homes.tfspec_dummy_all.json    |   1 -
 .../datasets/homes3/homes.tfspec_dummy_sparse.json |   1 +
 25 files changed, 805 insertions(+), 259 deletions(-)

diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java 
b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
index 5a3667c..93654b7 100644
--- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
+++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
@@ -171,6 +171,10 @@ public class ConfigurationManager
                return 
getCompilerConfigFlag(ConfigType.PARALLEL_CP_MATRIX_OPERATIONS);
        }
        
+       public static boolean isParallelTransform() {
+               return 
getDMLConfig().getBooleanValue(DMLConfig.PARALLEL_ENCODE);
+       }
+       
        public static boolean isParallelParFor() {
                return 
getCompilerConfigFlag(ConfigType.PARALLEL_LOCAL_OR_REMOTE_PARFOR);
        }
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java 
b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index 0b7692b..db59505 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -67,6 +67,7 @@ public class DMLConfig
        public static final String DEFAULT_BLOCK_SIZE   = 
"sysds.defaultblocksize";
        public static final String CP_PARALLEL_OPS      = 
"sysds.cp.parallel.ops";
        public static final String CP_PARALLEL_IO       = 
"sysds.cp.parallel.io";
+       public static final String PARALLEL_ENCODE      = 
"sysds.parallel.encode";  // boolean: enable multi-threaded transformencode and 
apply
        public static final String COMPRESSED_LINALG    = 
"sysds.compressed.linalg";
        public static final String COMPRESSED_LOSSY     = 
"sysds.compressed.lossy";
        public static final String COMPRESSED_VALID_COMPRESSIONS = 
"sysds.compressed.valid.compressions";
@@ -125,6 +126,7 @@ public class DMLConfig
                _defaultVals.put(DEFAULT_BLOCK_SIZE,     
String.valueOf(OptimizerUtils.DEFAULT_BLOCKSIZE) );
                _defaultVals.put(CP_PARALLEL_OPS,        "true" );
                _defaultVals.put(CP_PARALLEL_IO,         "true" );
+               _defaultVals.put(PARALLEL_ENCODE,        "false" );
                _defaultVals.put(COMPRESSED_LINALG,      
Compression.CompressConfig.FALSE.name() );
                _defaultVals.put(COMPRESSED_LOSSY,       "false" );
                _defaultVals.put(COMPRESSED_VALID_COMPRESSIONS, "SDC,DDC");
@@ -398,7 +400,7 @@ public class DMLConfig
        public String getConfigInfo()  {
                String[] tmpConfig = new String[] { 
                        LOCAL_TMP_DIR,SCRATCH_SPACE,OPTIMIZATION_LEVEL, 
DEFAULT_BLOCK_SIZE,
-                       CP_PARALLEL_OPS, CP_PARALLEL_IO, NATIVE_BLAS, 
NATIVE_BLAS_DIR,
+                       CP_PARALLEL_OPS, CP_PARALLEL_IO, PARALLEL_ENCODE, 
NATIVE_BLAS, NATIVE_BLAS_DIR,
                        COMPRESSED_LINALG, COMPRESSED_LOSSY, 
COMPRESSED_VALID_COMPRESSIONS, COMPRESSED_OVERLAPPING,
                        COMPRESSED_SAMPLING_RATIO, COMPRESSED_COCODE, 
COMPRESSED_TRANSPOSE,
                        CODEGEN, CODEGEN_API, CODEGEN_COMPILER, 
CODEGEN_OPTIMIZER, CODEGEN_PLANCACHE, CODEGEN_LITERALS,
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index 01769c7..d63b2cb 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -1008,6 +1008,24 @@ public class OptimizerUtils
                        
                return ret;
        }
+
+       public static int getTransformNumThreads(int maxNumThreads)
+       {
+               //by default max local parallelism (vcores) 
+               int ret = InfrastructureAnalyzer.getLocalParallelism();
+               
+               //apply external max constraint (e.g., set by parfor or other 
rewrites)
+               if( maxNumThreads > 0 ) {
+                       ret = Math.min(ret, maxNumThreads);
+               }
+               
+               //check if enabled in config.xml
+               if( !ConfigurationManager.isParallelTransform() ) {
+                       ret = 1;
+               }
+                       
+               return ret;
+       }
        
        public static Level getDefaultLogLevel() {
                Level log = Logger.getRootLogger().getLevel();
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index 2c205f3..d374d3c 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -1345,11 +1345,6 @@ public class CompressedMatrixBlock extends MatrixBlock {
        }
 
        @Override
-       public void quickSetValueThreadSafe(int r, int c, double v) {
-               throw new DMLCompressionException("Thread safe execution does 
not work on Compressed Matrix");
-       }
-
-       @Override
        public double quickGetValueThreadSafe(int r, int c) {
                throw new DMLCompressionException("Thread safe execution does 
not work on Compressed Matrix");
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
index b6e0d97..800aa51 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
@@ -85,7 +86,8 @@ public class MultiReturnParameterizedBuiltinCPInstruction 
extends ComputationCPI
 
                // execute block transform encode
                MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec, 
colnames, fin.getNumColumns(), null);
-               MatrixBlock data = encoder.encode(fin); // build and apply
+               // TODO: Assign #threads in compiler and pass via the 
instruction string
+               MatrixBlock data = encoder.encode(fin, 
OptimizerUtils.getTransformNumThreads(-1)); // build and apply
                FrameBlock meta = encoder.getMetaData(new 
FrameBlock(fin.getNumColumns(), ValueType.STRING));
                meta.setColumnNames(colnames);
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 160f8e9..a86a878 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -117,7 +117,7 @@ import org.apache.sysds.utils.NativeHelper;
 
 public class MatrixBlock extends MatrixValue implements CacheBlock, 
Externalizable {
        // private static final Log LOG = 
LogFactory.getLog(MatrixBlock.class.getName());
-       
+
        private static final long serialVersionUID = 7319972089143154056L;
        
        //sparsity nnz threshold, based on practical experiments on space 
consumption and performance
@@ -654,27 +654,6 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                }
        }
 
-       /**
-        * Thread save set.
-        * Blocks need to be allocated, and in case of MCSR sparse, all rows 
-        * that are going to be accessed need to be allocated as well.
-        * 
-        * @param r row 
-        * @param c column 
-        * @param v value
-        */
-       public void quickSetValueThreadSafe(int r, int c, double v) {
-               if(sparse) {
-                       if(!(sparseBlock instanceof SparseBlockMCSR))
-                               throw new RuntimeException("Only MCSR Blocks 
are supported for Multithreaded sparse set.");
-                       synchronized (sparseBlock.get(r)) {
-                               sparseBlock.set(r,c,v);
-                       }
-               }
-               else
-                       denseBlock.set(r,c,v);
-       }
-
        public double quickGetValueThreadSafe(int r, int c) {
                if(sparse) {
                        if(!(sparseBlock instanceof SparseBlockMCSR))
@@ -976,7 +955,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
 
        /**
         * Wrapper method for single threaded reduceall-colSum of a matrix.
-        * 
+        *
         * @return A new MatrixBlock containing the column sums of this matrix.
         */
        public MatrixBlock colSum() {
@@ -986,7 +965,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
 
        /**
         * Wrapper method for single threaded reduceall-rowSum of a matrix.
-        * 
+        *
         * @return A new MatrixBlock containing the row sums of this matrix.
         */
        public MatrixBlock rowSum(){
@@ -1422,7 +1401,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                        throw new RuntimeException( "Copy must not overwrite 
itself!" );
                if(that instanceof CompressedMatrixBlock)
                        that = CompressedMatrixBlock.getUncompressed(that, 
"Copy not effecient into a MatrixBlock");
-               
+
                rlen=that.rlen;
                clen=that.clen;
                sparse=sp;
@@ -2935,7 +2914,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                LibMatrixBincell.isValidDimensionsBinary(this, that);
                if(thatValue instanceof CompressedMatrixBlock)
                        return ((CompressedMatrixBlock) 
thatValue).binaryOperationsLeft(op, this, result);
-               
+
                //compute output dimensions
                boolean outer = (LibMatrixBincell.getBinaryAccessType(this, 
that)
                                == BinaryAccessType.OUTER_VECTOR_VECTOR); 
@@ -2980,7 +2959,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                        m2 = ((CompressedMatrixBlock) 
m2).getUncompressed("Ternay Operator arg2 " + op.fn.getClass().getSimpleName());
                if(m3 instanceof CompressedMatrixBlock)
                        m3 = ((CompressedMatrixBlock) 
m3).getUncompressed("Ternay Operator arg3 " + op.fn.getClass().getSimpleName());
-               
+
                //prepare inputs
                final boolean s1 = (rlen==1 && clen==1);
                final boolean s2 = (m2.rlen==1 && m2.clen==1);
@@ -3674,7 +3653,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                                                "Invalid nCol dimension for 
append rbind: was " + in[i].clen + " should be: " + clen);
                }
        }
-       
+
        public static MatrixBlock naryOperations(Operator op, MatrixBlock[] 
matrices, ScalarObject[] scalars, MatrixBlock ret) {
                //note: currently only min max, plus supported and hence 
specialized implementation
                //prepare operator
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
index 33bf452..e04eeff 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
@@ -20,6 +20,7 @@
 package org.apache.sysds.runtime.transform.encode;
 
 import static 
org.apache.sysds.runtime.transform.encode.EncoderFactory.getEncoderType;
+import static org.apache.sysds.runtime.util.UtilFunctions.getBlockSizes;
 
 import java.io.Externalizable;
 import java.io.IOException;
@@ -45,6 +46,8 @@ import org.apache.sysds.runtime.util.DependencyThreadPool;
  */
 public abstract class ColumnEncoder implements Externalizable, Encoder, 
Comparable<ColumnEncoder> {
        protected static final Log LOG = 
LogFactory.getLog(ColumnEncoder.class.getName());
+       protected static final int APPLY_ROW_BLOCKS_PER_COLUMN = 1;
+       public static int BUILD_ROW_BLOCKS_PER_COLUMN = 1;
        private static final long serialVersionUID = 2299156350718979064L;
        protected int _colID;
 
@@ -52,7 +55,23 @@ public abstract class ColumnEncoder implements 
Externalizable, Encoder, Comparab
                _colID = colID;
        }
 
-       public abstract MatrixBlock apply(MatrixBlock in, MatrixBlock out, int 
outputCol);
+       /**
+        * Apply Functions are only used in Single Threaded or Multi-Threaded 
Dense context.
+        * That's why there is no regard for MT sparse!
+        *
+        * @param in Input Block
+        * @param out Output Matrix
+        * @param outputCol The output column for the given column
+        * @return same as out
+        *
+        */
+       public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int 
outputCol){
+               return apply(in, out, outputCol, 0, -1);
+       }
+
+       public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol){
+               return apply(in, out, outputCol, 0, -1);
+       }
 
        public abstract MatrixBlock apply(MatrixBlock in, MatrixBlock out, int 
outputCol, int rowStart, int blk);
 
@@ -172,18 +191,18 @@ public abstract class ColumnEncoder implements 
Externalizable, Encoder, Comparab
         * complete if all previous tasks are done. This is so that we can use 
the last task as a dependency for the whole
         * build, reducing unnecessary dependencies.
         */
-       public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int 
blockSize) {
+       public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
                List<Callable<Object>> tasks = new ArrayList<>();
                List<List<? extends Callable<?>>> dep = null;
-               if(blockSize <= 0 || blockSize >= in.getNumRows()) {
+               int nRows = in.getNumRows();
+               int[] blockSizes = getBlockSizes(nRows, 
getNumBuildRowPartitions());
+               if(blockSizes.length == 1) {
                        tasks.add(getBuildTask(in));
                }
                else {
                        HashMap<Integer, Object> ret = new HashMap<>();
-                       for(int i = 0; i < in.getNumRows(); i = i + blockSize)
-                               tasks.add(getPartialBuildTask(in, i, blockSize, 
ret));
-                       if(in.getNumRows() % blockSize != 0)
-                               tasks.add(getPartialBuildTask(in, 
in.getNumRows() - in.getNumRows() % blockSize, -1, ret));
+                       for(int startRow = 0, i = 0; i < blockSizes.length; 
startRow+=blockSizes[i], i++)
+                               tasks.add(getPartialBuildTask(in, startRow, 
blockSizes[i], ret));
                        tasks.add(getPartialMergeBuildTask(ret));
                        dep = new ArrayList<>(Collections.nCopies(tasks.size() 
- 1, null));
                        dep.add(tasks.subList(0, tasks.size() - 1));
@@ -198,24 +217,63 @@ public abstract class ColumnEncoder implements 
Externalizable, Encoder, Comparab
        public Callable<Object> getPartialBuildTask(FrameBlock in, int 
startRow, int blockSize,
                HashMap<Integer, Object> ret) {
                throw new DMLRuntimeException(
-                       "Trying to get the PartialBuild task of an Encoder 
which does not support  " + "partial building");
+                       "Trying to get the PartialBuild task of an Encoder 
which does not support  partial building");
        }
 
        public Callable<Object> getPartialMergeBuildTask(HashMap<Integer, ?> 
ret) {
                throw new DMLRuntimeException(
-                       "Trying to get the BuildMergeTask task of an Encoder 
which does not support " + "partial building");
+                       "Trying to get the BuildMergeTask task of an Encoder 
which does not support partial building");
        }
 
        public List<DependencyTask<?>> getApplyTasks(FrameBlock in, MatrixBlock 
out, int outputCol) {
-               List<Callable<Object>> tasks = new ArrayList<>();
-               tasks.add(new ColumnApplyTask(this, in, out, outputCol));
-               return DependencyThreadPool.createDependencyTasks(tasks, null);
+               return getApplyTasks(in, null, out, outputCol);
        }
 
        public List<DependencyTask<?>> getApplyTasks(MatrixBlock in, 
MatrixBlock out, int outputCol) {
+               return getApplyTasks(null, in, out, outputCol);
+       }
+
+       private List<DependencyTask<?>> getApplyTasks(FrameBlock inF, 
MatrixBlock inM, MatrixBlock out, int outputCol){
                List<Callable<Object>> tasks = new ArrayList<>();
-               tasks.add(new ColumnApplyTask(this, in, out, outputCol));
-               return DependencyThreadPool.createDependencyTasks(tasks, null);
+               List<List<? extends Callable<?>>> dep = null;
+               if ((inF != null && inM != null) || (inF == null && inM == 
null))
+                       throw new DMLRuntimeException("getApplyTasks needs to 
be called with either FrameBlock input " +
+                                       "or MatrixBlock input");
+               int nRows = inF == null ? inM.getNumRows() : inF.getNumRows();
+               int[] blockSizes = getBlockSizes(nRows, 
getNumApplyRowPartitions());
+               for(int startRow = 0, i = 0; i < blockSizes.length; 
startRow+=blockSizes[i], i++){
+                       if(inF != null)
+                               if(out.isInSparseFormat())
+                                       tasks.add(getSparseTask(inF, out, 
outputCol, startRow, blockSizes[i]));
+                               else
+                                       tasks.add(new ColumnApplyTask<>(this, 
inF, out, outputCol, startRow, blockSizes[i]));
+                       else
+                       if(out.isInSparseFormat())
+                               tasks.add(getSparseTask(inM, out, outputCol, 
startRow, blockSizes[i]));
+                       else
+                               tasks.add(new ColumnApplyTask<>(this, inM, out, 
outputCol, startRow, blockSizes[i]));
+               }
+               if(tasks.size() > 1){
+                       dep = new ArrayList<>(Collections.nCopies(tasks.size(), 
null));
+                       tasks.add(() -> null);  // Empty task as barrier
+                       dep.add(tasks.subList(0, tasks.size()-1));
+               }
+
+               return DependencyThreadPool.createDependencyTasks(tasks, dep);
+       }
+
+       protected abstract ColumnApplyTask<? extends ColumnEncoder> 
+                       getSparseTask(FrameBlock in, MatrixBlock out, int 
outputCol, int startRow, int blk);
+
+       protected abstract ColumnApplyTask<? extends ColumnEncoder> 
+                       getSparseTask(MatrixBlock in, MatrixBlock out, int 
outputCol, int startRow, int blk);
+
+       protected int getNumApplyRowPartitions(){
+               return APPLY_ROW_BLOCKS_PER_COLUMN;
+       }
+
+       protected int getNumBuildRowPartitions(){
+               return BUILD_ROW_BLOCKS_PER_COLUMN;
        }
 
        public enum EncoderType {
@@ -226,39 +284,54 @@ public abstract class ColumnEncoder implements 
Externalizable, Encoder, Comparab
         * This is the base Task for each column apply. If no custom 
"getApplyTasks" is implemented in an Encoder this task
         * will be used.
         */
-       private static class ColumnApplyTask implements Callable<Object> {
+       protected static class ColumnApplyTask<T extends ColumnEncoder> 
implements Callable<Object> {
+
+               protected final T _encoder;
+               protected final FrameBlock _inputF;
+               protected final MatrixBlock _inputM;
+               protected final MatrixBlock _out;
+               protected final int _outputCol;
+               protected final int _startRow;
+               protected final int _blk;
+
+               protected ColumnApplyTask(T encoder, FrameBlock input, 
MatrixBlock out, int outputCol){
+                       this(encoder, input, out, outputCol, 0, -1);
+               }
 
-               private final ColumnEncoder _encoder;
-               private final FrameBlock _inputF;
-               private final MatrixBlock _inputM;
-               private final MatrixBlock _out;
-               private final int _outputCol;
+               protected ColumnApplyTask(T encoder, MatrixBlock input, 
MatrixBlock out, int outputCol){
+                       this(encoder, input, out, outputCol, 0, -1);
+               }
 
-               protected ColumnApplyTask(ColumnEncoder encoder, FrameBlock 
input, MatrixBlock out, int outputCol) {
-                       _encoder = encoder;
-                       _inputF = input;
-                       _inputM = null;
-                       _out = out;
-                       _outputCol = outputCol;
+               protected ColumnApplyTask(T encoder, FrameBlock input, 
MatrixBlock out, int outputCol, int startRow, int blk) {
+                       this(encoder, input, null, out, outputCol, startRow, 
blk);
                }
 
-               protected ColumnApplyTask(ColumnEncoder encoder, MatrixBlock 
input, MatrixBlock out, int outputCol) {
+               protected ColumnApplyTask(T encoder, MatrixBlock input, 
MatrixBlock out, int outputCol, int startRow, int blk) {
+                       this(encoder, null, input, out, outputCol, startRow, 
blk);
+               }
+               private  ColumnApplyTask(T encoder, FrameBlock inputF, 
MatrixBlock inputM, MatrixBlock out, int outputCol,
+                                                                int startRow, 
int blk){
                        _encoder = encoder;
-                       _inputM = input;
-                       _inputF = null;
+                       _inputM = inputM;
+                       _inputF = inputF;
                        _out = out;
                        _outputCol = outputCol;
+                       _startRow = startRow;
+                       _blk = blk;
                }
 
                @Override
-               public Void call() throws Exception {
+               public Object call() throws Exception {
                        assert _outputCol >= 0;
-                       int _rowStart = 0;
-                       int _blk = -1;
+                       if(_out.isInSparseFormat()){
+                               // this is an issue since most sparse Tasks 
modify the sparse structure so normal get and set calls are
+                               // not possible.
+                               throw new DMLRuntimeException("ColumnApplyTask 
called although output is in sparse format.");
+                       }
                        if(_inputF == null)
-                               _encoder.apply(_inputM, _out, _outputCol, 
_rowStart, _blk);
+                               _encoder.apply(_inputM, _out, _outputCol, 
_startRow, _blk);
                        else
-                               _encoder.apply(_inputF, _out, _outputCol, 
_rowStart, _blk);
+                               _encoder.apply(_inputF, _out, _outputCol, 
_startRow, _blk);
                        return null;
                }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
index b4d4800..5736c1e 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
@@ -28,11 +28,15 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.concurrent.Callable;
 
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.tuple.MutableTriple;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.lops.Lop;
+import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
 
 public class ColumnEncoderBin extends ColumnEncoder {
        public static final String MIN_PREFIX = "min";
@@ -84,10 +88,13 @@ public class ColumnEncoderBin extends ColumnEncoder {
 
        @Override
        public void build(FrameBlock in) {
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                if(!isApplicable())
                        return;
                double[] pairMinMax = getMinMaxOfCol(in, _colID, 0, -1);
                computeBins(pairMinMax[0], pairMinMax[1]);
+               if(DMLScript.STATISTICS)
+                       
Statistics.incTransformBinningBuildTime(System.nanoTime()-t0);
        }
 
        private static double[] getMinMaxOfCol(FrameBlock in, int colID, int 
startRow, int blockSize) {
@@ -118,6 +125,8 @@ public class ColumnEncoderBin extends ColumnEncoder {
                return new BinMergePartialBuildTask(this, ret);
        }
 
+
+
        public void computeBins(double min, double max) {
                // ensure allocated internal transformation metadata
                if(_binMins == null || _binMaxs == null) {
@@ -146,39 +155,47 @@ public class ColumnEncoderBin extends ColumnEncoder {
        }
 
        @Override
-       public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) 
{
-               return apply(in, out, outputCol, 0, -1);
-       }
-
-       @Override
-       public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int 
outputCol) {
-               return apply(in, out, outputCol, 0, -1);
-       }
-
-       @Override
        public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, 
int rowStart, int blk) {
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                for(int i = rowStart; i < getEndIndex(in.getNumRows(), 
rowStart, blk); i++) {
                        double inVal = 
UtilFunctions.objectToDouble(in.getSchema()[_colID - 1], in.get(i, _colID - 1));
                        int ix = Arrays.binarySearch(_binMaxs, inVal);
                        int binID = ((ix < 0) ? Math.abs(ix + 1) : ix) + 1;
-                       out.quickSetValueThreadSafe(i, outputCol, binID);
+                       out.quickSetValue(i, outputCol, binID);
                }
+               if (DMLScript.STATISTICS)
+                       
Statistics.incTransformBinningApplyTime(System.nanoTime()-t0);
                return out;
        }
 
        @Override
        public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int 
outputCol, int rowStart, int blk) {
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                int end = getEndIndex(in.getNumRows(), rowStart, blk);
                for(int i = rowStart; i < end; i++) {
                        double inVal = in.quickGetValueThreadSafe(i, _colID - 
1);
                        int ix = Arrays.binarySearch(_binMaxs, inVal);
                        int binID = ((ix < 0) ? Math.abs(ix + 1) : ix) + 1;
-                       out.quickSetValueThreadSafe(i, outputCol, binID);
+                       out.quickSetValue(i, outputCol, binID);
                }
+               if (DMLScript.STATISTICS)
+                       
Statistics.incTransformBinningApplyTime(System.nanoTime()-t0);
                return out;
        }
 
        @Override
+       protected ColumnApplyTask<? extends ColumnEncoder> 
+               getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, 
int startRow, int blk) {
+               return new BinSparseApplyTask(this, in, out, outputCol);
+       }
+
+       @Override
+       protected ColumnApplyTask<? extends ColumnEncoder> 
+               getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, 
int startRow, int blk) {
+               throw new NotImplementedException("Sparse Binning for 
MatrixBlocks not jet implemented");
+       }
+
+       @Override
        public void mergeAt(ColumnEncoder other) {
                if(other instanceof ColumnEncoderBin) {
                        ColumnEncoderBin otherBin = (ColumnEncoderBin) other;
@@ -264,6 +281,43 @@ public class ColumnEncoderBin extends ColumnEncoder {
                }
        }
 
+       private static class BinSparseApplyTask extends 
ColumnApplyTask<ColumnEncoderBin> {
+
+               public BinSparseApplyTask(ColumnEncoderBin encoder, FrameBlock 
input, 
+                               MatrixBlock out, int outputCol, int startRow, 
int blk) {
+                       super(encoder, input, out, outputCol, startRow, blk);
+               }
+
+               private BinSparseApplyTask(ColumnEncoderBin encoder, FrameBlock 
input, MatrixBlock out, int outputCol) {
+                       super(encoder, input, out, outputCol);
+               }
+
+               public Object call() throws Exception {
+                       long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+                       int index = _encoder._colID - 1;
+                       if(_out.getSparseBlock() == null)
+                               return null;
+                       assert _inputF != null;
+                       for(int r = _startRow; r < 
getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) {
+                               SparseRowVector row = (SparseRowVector) 
_out.getSparseBlock().get(r);
+                               double inVal = 
UtilFunctions.objectToDouble(_inputF.getSchema()[index], _inputF.get(r, index));
+                               int ix = Arrays.binarySearch(_encoder._binMaxs, 
inVal);
+                               int binID = ((ix < 0) ? Math.abs(ix + 1) : ix) 
+ 1;
+                               row.values()[index] = binID;
+                               row.indexes()[index] = _outputCol;
+                       }
+                       if(DMLScript.STATISTICS)
+                               
Statistics.incTransformBinningApplyTime(System.nanoTime()-t0);
+                       return null;
+               }
+
+               @Override
+               public String toString() {
+                       return getClass().getSimpleName() + "<ColId: " + 
_encoder._colID + ">";
+               }
+
+       }
+
        private static class BinPartialBuildTask implements Callable<Object> {
 
                private final FrameBlock _input;
@@ -284,7 +338,13 @@ public class ColumnEncoderBin extends ColumnEncoder {
 
                @Override
                public double[] call() throws Exception {
-                       _partialMinMax.put(_startRow, getMinMaxOfCol(_input, 
_colID, _startRow, _blockSize));
+                       long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+                       double[] minMax = getMinMaxOfCol(_input, _colID, 
_startRow, _blockSize);
+                       synchronized (_partialMinMax){
+                               _partialMinMax.put(_startRow, minMax);
+                       }
+                       if (DMLScript.STATISTICS)
+                               
Statistics.incTransformBinningBuildTime(System.nanoTime()-t0);
                        return null;
                }
 
@@ -306,6 +366,7 @@ public class ColumnEncoderBin extends ColumnEncoder {
 
                @Override
                public Object call() throws Exception {
+                       long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                        double min = Double.POSITIVE_INFINITY;
                        double max = Double.NEGATIVE_INFINITY;
                        for(Object minMax : _partialMaps.values()) {
@@ -313,6 +374,9 @@ public class ColumnEncoderBin extends ColumnEncoder {
                                max = Math.max(max, ((double[]) minMax)[1]);
                        }
                        _encoder.computeBins(min, max);
+
+                       if(DMLScript.STATISTICS)
+                               
Statistics.incTransformBinningBuildTime(System.nanoTime()-t0);
                        return null;
                }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
index 54b8795..f7611c3 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
@@ -139,11 +139,23 @@ public class ColumnEncoderComposite extends ColumnEncoder 
{
        }
 
        @Override
-       public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int 
blockSize) {
+       protected ColumnApplyTask<? extends ColumnEncoder> 
+               getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, 
int startRow, int blk) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       protected ColumnApplyTask<? extends ColumnEncoder> 
+               getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, 
int startRow, int blk) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
                List<DependencyTask<?>> tasks = new ArrayList<>();
                Map<Integer[], Integer[]> depMap = null;
                for(ColumnEncoder columnEncoder : _columnEncoders) {
-                       List<DependencyTask<?>> t = 
columnEncoder.getBuildTasks(in, blockSize);
+                       List<DependencyTask<?>> t = 
columnEncoder.getBuildTasks(in);
                        if(t == null)
                                continue;
                        // Linear execution between encoders so they can't be 
built in parallel
@@ -179,16 +191,6 @@ public class ColumnEncoderComposite extends ColumnEncoder {
        }
 
        @Override
-       public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) 
{
-               return apply(in, out, outputCol, 0, -1);
-       }
-
-       @Override
-       public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int 
outputCol) {
-               return apply(in, out, outputCol, 0, -1);
-       }
-
-       @Override
        public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, 
int rowStart, int blk) {
                try {
                        for(int i = 0; i < _columnEncoders.size(); i++) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
index 25b2eb9..1047f54 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
@@ -24,17 +24,15 @@ import static 
org.apache.sysds.runtime.util.UtilFunctions.getEndIndex;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.Callable;
 
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.DependencyTask;
-import org.apache.sysds.runtime.util.DependencyThreadPool;
+import org.apache.sysds.utils.Statistics;
 
 public class ColumnEncoderDummycode extends ColumnEncoder {
        private static final long serialVersionUID = 5832130477659116489L;
@@ -60,26 +58,18 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
        }
 
        @Override
-       public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int 
blockSize) {
+       public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
                return null;
        }
 
        @Override
-       public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) 
{
-               return apply(in, out, outputCol, 0, -1);
-       }
-
-       public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int 
outputCol) {
-               return apply(in, out, outputCol, 0, -1);
-       }
-
-       @Override
        public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, 
int rowStart, int blk) {
                throw new DMLRuntimeException("Called DummyCoder with 
FrameBlock");
        }
 
        @Override
        public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int 
outputCol, int rowStart, int blk) {
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                // Out Matrix should already be correct size!
                // append dummy coded or unchanged values to output
                for(int i = rowStart; i < getEndIndex(in.getNumRows(), 
rowStart, blk); i++) {
@@ -89,20 +79,25 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
                        int nCol = outputCol + (int) val - 1;
                        // Setting value to 0 first in case of sparse so the 
row vector does not need to be resized
                        if(nCol != outputCol)
-                               out.quickSetValueThreadSafe(i, outputCol, 0);
-                       out.quickSetValueThreadSafe(i, nCol, 1);
+                               out.quickSetValue(i, outputCol, 0);
+                       out.quickSetValue(i, nCol, 1);
                }
+               if (DMLScript.STATISTICS)
+                       
Statistics.incTransformDummyCodeApplyTime(System.nanoTime()-t0);
                return out;
        }
 
+
+       @Override
+       protected ColumnApplyTask<? extends ColumnEncoder> 
+               getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, 
int startRow, int blk) {
+               return new DummycodeSparseApplyTask(this, in, out, outputCol, 
startRow, blk);
+       }
+
        @Override
-       public List<DependencyTask<?>> getApplyTasks(MatrixBlock in, 
MatrixBlock out, int outputCol) {
-               List<Callable<Object>> tasks = new ArrayList<>();
-               if(out.isInSparseFormat())
-                       tasks.add(new DummycodeSparseApplyTask(this, in, out, 
outputCol));
-               else
-                       return super.getApplyTasks(in, out, outputCol);
-               return DependencyThreadPool.createDependencyTasks(tasks, null);
+       protected ColumnApplyTask<? extends ColumnEncoder> 
+               getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, 
int startRow, int blk) {
+               throw new DMLRuntimeException("Called DummyCoder with 
FrameBlock");
        }
 
        @Override
@@ -139,6 +134,7 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
 
                        if(distinct != -1) {
                                _domainSize = distinct;
+                               LOG.debug("DummyCoder for column: " + _colID + 
" has domain size: " + _domainSize);
                        }
                }
        }
@@ -188,48 +184,47 @@ public class ColumnEncoderDummycode extends ColumnEncoder 
{
                return _domainSize;
        }
 
-       private static class DummycodeSparseApplyTask implements 
Callable<Object> {
-               private final ColumnEncoderDummycode _encoder;
-               private final MatrixBlock _input;
-               private final MatrixBlock _out;
-               private final int _outputCol;
+       private static class DummycodeSparseApplyTask extends 
ColumnApplyTask<ColumnEncoderDummycode> {
+
+               protected DummycodeSparseApplyTask(ColumnEncoderDummycode 
encoder, MatrixBlock input, 
+                               MatrixBlock out, int outputCol) {
+                       super(encoder, input, out, outputCol);
+               }
 
-               private DummycodeSparseApplyTask(ColumnEncoderDummycode 
encoder, MatrixBlock input, MatrixBlock out,
-                       int outputCol) {
-                       _encoder = encoder;
-                       _input = input;
-                       _out = out;
-                       _outputCol = outputCol;
+               protected DummycodeSparseApplyTask(ColumnEncoderDummycode 
encoder, MatrixBlock input, 
+                               MatrixBlock out, int outputCol, int startRow, 
int blk) {
+                       super(encoder, input, out, outputCol, startRow, blk);
                }
 
                public Object call() throws Exception {
-                       for(int r = 0; r < _input.getNumRows(); r++) {
-                               if(_out.getSparseBlock() == null)
-                                       return null;
-                               synchronized(_out.getSparseBlock().get(r)) {
-                                       // Since the recoded values are already 
offset in the output matrix (same as input at this point)
-                                       // the dummycoding only needs to offset 
them within their column domain. Which means that the
-                                       // indexes in the SparseRowVector do 
not need to be sorted anymore and can be updated directly.
-                                       //
-                                       // Input: Output:
-                                       //
-                                       // 1 | 0 | 2 | 0 1 | 0 | 0 | 1
-                                       // 2 | 0 | 1 | 0 ===> 0 | 1 | 1 | 0
-                                       // 1 | 0 | 2 | 0 1 | 0 | 0 | 1
-                                       // 1 | 0 | 1 | 0 1 | 0 | 1 | 0
-                                       //
-                                       // Example SparseRowVector Internals 
(1. row):
-                                       //
-                                       // indexes = [0,2] ===> indexes = [0,3]
-                                       // values = [1,2] values = [1,1]
-                                       int index = ((SparseRowVector) 
_out.getSparseBlock().get(r)).getIndex(_outputCol);
-                                       double val = 
_out.getSparseBlock().get(r).values()[index];
-                                       int nCol = _outputCol + (int) val - 1;
-
-                                       
_out.getSparseBlock().get(r).indexes()[index] = nCol;
-                                       
_out.getSparseBlock().get(r).values()[index] = 1;
-                               }
+                       long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+                       assert _inputM != null;
+                       if(_out.getSparseBlock() == null)
+                               return null;
+                       for(int r = _startRow; r < 
getEndIndex(_inputM.getNumRows(), _startRow, _blk); r++) {
+                               // Since the recoded values are already offset 
in the output matrix (same as input at this point)
+                               // the dummycoding only needs to offset them 
within their column domain. Which means that the
+                               // indexes in the SparseRowVector do not need 
to be sorted anymore and can be updated directly.
+                               //
+                               // Input: Output:
+                               //
+                               // 1 | 0 | 2 | 0                1 | 0 | 0 | 1
+                               // 2 | 0 | 1 | 0 ===>   0 | 1 | 1 | 0
+                               // 1 | 0 | 2 | 0                1 | 0 | 0 | 1
+                               // 1 | 0 | 1 | 0                1 | 0 | 1 | 0
+                               //
+                               // Example SparseRowVector Internals (1. row):
+                               //
+                               // indexes = [0,2] ===> indexes = [0,3]
+                               // values = [1,2] values = [1,1]
+                               int index = _encoder._colID - 1;
+                               double val = 
_out.getSparseBlock().get(r).values()[index];
+                               int nCol = _outputCol + (int) val - 1;
+                               _out.getSparseBlock().get(r).indexes()[index] = 
nCol;
+                               _out.getSparseBlock().get(r).values()[index] = 
1;
                        }
+                       if (DMLScript.STATISTICS)
+                               
Statistics.incTransformDummyCodeApplyTime(System.nanoTime()-t0);
                        return null;
                }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
index 1c74ae5..d30d8dc 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
@@ -26,11 +26,15 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.List;
 
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.DependencyTask;
 import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
 
 /**
  * Class used for feature hashing transformation of frames.
@@ -56,7 +60,7 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder {
        }
 
        private long getCode(String key) {
-               return key.hashCode() % _K;
+               return (key.hashCode() % _K) + 1;
        }
 
        @Override
@@ -65,22 +69,25 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder 
{
        }
 
        @Override
-       public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int 
blockSize) {
+       public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
                return null;
        }
 
        @Override
-       public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) 
{
-               return apply(in, out, outputCol, 0, -1);
+       protected ColumnApplyTask<? extends ColumnEncoder> 
+               getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, 
int startRow, int blk) {
+               return new FeatureHashSparseApplyTask(this, in, out, outputCol, 
startRow, blk);
        }
 
        @Override
-       public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int 
outputCol) {
-               return apply(in, out, outputCol, 0, -1);
+       protected ColumnApplyTask<? extends ColumnEncoder> 
+               getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, 
int startRow, int blk) {
+               throw new NotImplementedException("Sparse FeatureHashing for 
MatrixBlocks not jet implemented");
        }
 
        @Override
        public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, 
int rowStart, int blk) {
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                // apply feature hashing column wise
                for(int i = rowStart; i < getEndIndex(in.getNumRows(), 
rowStart, blk); i++) {
                        Object okey = in.get(i, _colID - 1);
@@ -88,21 +95,26 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder 
{
                        if(key == null)
                                throw new DMLRuntimeException("Missing Value 
encountered in input Frame for FeatureHash");
                        long code = getCode(key);
-                       out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ? 
code : Double.NaN);
+                       out.quickSetValue(i, outputCol, (code >= 0) ? code : 
Double.NaN);
                }
+               if(DMLScript.STATISTICS)
+                       
Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0);
                return out;
        }
 
        @Override
        public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int 
outputCol, int rowStart, int blk) {
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                int end = getEndIndex(in.getNumRows(), rowStart, blk);
                // apply feature hashing column wise
                for(int i = rowStart; i < end; i++) {
                        Object okey = in.quickGetValueThreadSafe(i, _colID - 1);
                        String key = okey.toString();
                        long code = getCode(key);
-                       out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ? 
code : Double.NaN);
+                       out.quickSetValue(i, outputCol, (code >= 0) ? code : 
Double.NaN);
                }
+               if(DMLScript.STATISTICS)
+                       
Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0);
                return out;
        }
 
@@ -145,4 +157,40 @@ public class ColumnEncoderFeatureHash extends 
ColumnEncoder {
                super.readExternal(in);
                _K = in.readLong();
        }
+
+       public static class FeatureHashSparseApplyTask extends 
ColumnApplyTask<ColumnEncoderFeatureHash>{
+
+               public FeatureHashSparseApplyTask(ColumnEncoderFeatureHash 
encoder, FrameBlock input, 
+                               MatrixBlock out, int outputCol, int startRow, 
int blk) {
+                       super(encoder, input, out, outputCol, startRow, blk);
+               }
+
+               public FeatureHashSparseApplyTask(ColumnEncoderFeatureHash 
encoder, FrameBlock input, 
+                               MatrixBlock out, int outputCol) {
+                       super(encoder, input, out, outputCol);
+               }
+
+               @Override
+               public Object call() throws Exception {
+                       if(_out.getSparseBlock() == null)
+                               return null;
+                       long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+                       int index = _encoder._colID - 1;
+                       assert _inputF != null;
+                       for(int r = _startRow; r < 
getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++){
+                               SparseRowVector row = (SparseRowVector) 
_out.getSparseBlock().get(r);
+                               Object okey = _inputF.get(r, index);
+                               String key = (okey != null) ? okey.toString() : 
null;
+                               if(key == null)
+                                       throw new DMLRuntimeException("Missing 
Value encountered in input Frame for FeatureHash");
+                               long code = _encoder.getCode(key);
+                               row.values()[index] = code;
+                               row.indexes()[index] = _outputCol;
+                       }
+                       if(DMLScript.STATISTICS)
+                               
Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0);
+                       return null;
+               }
+       }
+
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
index 7a8df24..5c7392c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
@@ -21,16 +21,22 @@ package org.apache.sysds.runtime.transform.encode;
 
 import static org.apache.sysds.runtime.util.UtilFunctions.getEndIndex;
 
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.DependencyTask;
 import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
 
 public class ColumnEncoderPassThrough extends ColumnEncoder {
        private static final long serialVersionUID = -8473768154646831882L;
+       private List<Integer> sparseRowsWZeros = null;
 
        protected ColumnEncoderPassThrough(int ptCols) {
                super(ptCols); // 1-based
@@ -40,37 +46,46 @@ public class ColumnEncoderPassThrough extends ColumnEncoder 
{
                this(-1);
        }
 
+       public List<Integer> getSparseRowsWZeros(){
+               return sparseRowsWZeros;
+       }
+
        @Override
        public void build(FrameBlock in) {
                // do nothing
        }
 
        @Override
-       public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int 
blockSize) {
+       public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
                return null;
        }
 
        @Override
-       public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) 
{
-               return apply(in, out, outputCol, 0, -1);
+       protected ColumnApplyTask<? extends ColumnEncoder> 
+               getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, 
int startRow, int blk) {
+               return new PassThroughSparseApplyTask(this, in, out, outputCol, 
startRow, blk);
        }
 
        @Override
-       public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int 
outputCol) {
-               return apply(in, out, outputCol, 0, -1);
+       protected ColumnApplyTask<? extends ColumnEncoder> 
+               getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, 
int startRow, int blk) {
+               throw new NotImplementedException("Sparse PassThrough for 
MatrixBlocks not jet implemented");
        }
 
        @Override
        public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, 
int rowStart, int blk) {
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                int col = _colID - 1; // 1-based
                ValueType vt = in.getSchema()[col];
                for(int i = rowStart; i < getEndIndex(in.getNumRows(), 
rowStart, blk); i++) {
                        Object val = in.get(i, col);
                        double v = (val == null ||
-                               (vt == ValueType.STRING && 
val.toString().isEmpty())) ? Double.NaN : UtilFunctions.objectToDouble(vt,
-                                       val);
-                       out.quickSetValueThreadSafe(i, outputCol, v);
+                               (vt == ValueType.STRING && 
val.toString().isEmpty())) 
+                                       ? Double.NaN : 
UtilFunctions.objectToDouble(vt, val);
+                       out.quickSetValue(i, outputCol, v);
                }
+               if(DMLScript.STATISTICS)
+                       
Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0);
                return out;
        }
 
@@ -79,12 +94,15 @@ public class ColumnEncoderPassThrough extends ColumnEncoder 
{
                // only transfer from in to out
                if(in == out)
                        return out;
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                int col = _colID - 1; // 1-based
                int end = getEndIndex(in.getNumRows(), rowStart, blk);
                for(int i = rowStart; i < end; i++) {
                        double val = in.quickGetValueThreadSafe(i, col);
-                       out.quickSetValueThreadSafe(i, outputCol, val);
+                       out.quickSetValue(i, outputCol, val);
                }
+               if(DMLScript.STATISTICS)
+                       
Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0);
                return out;
        }
 
@@ -106,4 +124,58 @@ public class ColumnEncoderPassThrough extends 
ColumnEncoder {
        public void initMetaData(FrameBlock meta) {
                // do nothing
        }
+
+       public static class PassThroughSparseApplyTask extends 
ColumnApplyTask<ColumnEncoderPassThrough>{
+
+
+               protected PassThroughSparseApplyTask(ColumnEncoderPassThrough 
encoder, FrameBlock input, 
+                               MatrixBlock out, int outputCol) {
+                       super(encoder, input, out, outputCol);
+               }
+
+               protected PassThroughSparseApplyTask(ColumnEncoderPassThrough 
encoder, FrameBlock input, MatrixBlock out, 
+                               int outputCol, int startRow, int blk) {
+                       super(encoder, input, out, outputCol, startRow, blk);
+               }
+
+               @Override
+               public Object call() throws Exception {
+                       if(_out.getSparseBlock() == null)
+                               return null;
+                       long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+                       int index = _encoder._colID - 1;
+                       assert _inputF != null;
+                       List<Integer> sparseRowsWZeros = null;
+                       ValueType vt = _inputF.getSchema()[index];
+                       for(int r = _startRow; r < 
getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) {
+                               Object val = _inputF.get(r, index);
+                               double v = (val == null || (vt == 
ValueType.STRING && val.toString().isEmpty())) ?
+                                               Double.NaN : 
UtilFunctions.objectToDouble(vt, val);
+                               SparseRowVector row = (SparseRowVector) 
_out.getSparseBlock().get(r);
+                               if(v == 0) {
+                                       if(sparseRowsWZeros == null)
+                                               sparseRowsWZeros = new 
ArrayList<>();
+                                       sparseRowsWZeros.add(r);
+                               }
+                               row.values()[index] = v;
+                               row.indexes()[index] = _outputCol;
+                       }
+                       if(sparseRowsWZeros != null){
+                               synchronized (_encoder){
+                                       if(_encoder.sparseRowsWZeros == null)
+                                               _encoder.sparseRowsWZeros = new 
ArrayList<>();
+                                       
_encoder.sparseRowsWZeros.addAll(sparseRowsWZeros);
+                               }
+                       }
+                       if(DMLScript.STATISTICS)
+                               
Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0);
+                       return null;
+               }
+
+               public String toString() {
+                       return getClass().getSimpleName() + "<ColId: " + 
_encoder._colID + ">";
+               }
+
+       }
+
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
index fd18d86..e190d74 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
@@ -33,10 +33,13 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.lops.Lop;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.utils.Statistics;
 
 public class ColumnEncoderRecode extends ColumnEncoder {
        private static final long serialVersionUID = 8213163881283341874L;
@@ -135,7 +138,11 @@ public class ColumnEncoderRecode extends ColumnEncoder {
        public void build(FrameBlock in) {
                if(!isApplicable())
                        return;
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                makeRcdMap(in, _rcdMap, _colID, 0, in.getNumRows());
+               if(DMLScript.STATISTICS){
+                       
Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0);
+               }
        }
 
        @Override
@@ -186,18 +193,17 @@ public class ColumnEncoderRecode extends ColumnEncoder {
        }
 
        @Override
-       public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) 
{
-               return apply(in, out, outputCol, 0, -1);
-       }
-
-       @Override
        public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, 
int rowStart, int blk) {
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                // FrameBlock is column Major and MatrixBlock row Major this 
results in cache inefficiencies :(
                for(int i = rowStart; i < getEndIndex(in.getNumRows(), 
rowStart, blk); i++) {
                        Object okey = in.get(i, _colID - 1);
                        String key = (okey != null) ? okey.toString() : null;
                        long code = lookupRCDMap(key);
-                       out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ? 
code : Double.NaN);
+                       out.quickSetValue(i, outputCol, (code >= 0) ? code : 
Double.NaN);
+               }
+               if(DMLScript.STATISTICS){
+                       
Statistics.incTransformRecodeApplyTime(System.nanoTime() - t0);
                }
                return out;
        }
@@ -209,9 +215,16 @@ public class ColumnEncoderRecode extends ColumnEncoder {
        }
 
        @Override
-       public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int 
outputCol) {
-               throw new DMLRuntimeException(
-                       "Recode called with MatrixBlock. Should not happen 
since Recode is the first " + "encoder in the Stack");
+       protected ColumnApplyTask<? extends ColumnEncoder> 
+               getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, 
int startRow, int blk){
+               return new RecodeSparseApplyTask(this, in ,out, outputCol, 
startRow, blk);
+       }
+
+       @Override
+       protected ColumnApplyTask<? extends ColumnEncoder> 
+               getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, 
int startRow, int blk) {
+               throw new DMLRuntimeException("Recode called with MatrixBlock. 
Should not happen since Recode is the first " +
+                               "encoder in the Stack");
        }
 
        @Override
@@ -313,6 +326,48 @@ public class ColumnEncoderRecode extends ColumnEncoder {
                return _rcdMap;
        }
 
+       private static class RecodeSparseApplyTask extends 
ColumnApplyTask<ColumnEncoderRecode>{
+
+               public RecodeSparseApplyTask(ColumnEncoderRecode encoder, 
FrameBlock input, MatrixBlock out, int outputCol) {
+                       super(encoder, input, out, outputCol);
+               }
+
+               protected RecodeSparseApplyTask(ColumnEncoderRecode encoder, 
FrameBlock input, MatrixBlock out, 
+                               int outputCol, int startRow, int blk) {
+                       super(encoder, input, out, outputCol, startRow, blk);
+               }
+
+               public Object call() throws Exception {
+                       int index = _encoder._colID - 1;
+                       long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+                       if(_out.getSparseBlock() == null)
+                               return null;
+                       assert _inputF != null;
+                       for(int r = _startRow; r < 
getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) {
+                               SparseRowVector row = (SparseRowVector) 
_out.getSparseBlock().get(r);
+                               Object okey = _inputF.get(r, index);
+                               String key = (okey != null) ? okey.toString() : 
null;
+                               long code = _encoder.lookupRCDMap(key);
+                               double val = (code < 0) ? Double.NaN : code;
+                               row.values()[index] = val;
+                               row.indexes()[index] = _outputCol;
+                       }
+                       if(DMLScript.STATISTICS){
+                               
Statistics.incTransformRecodeApplyTime(System.nanoTime() - t0);
+                       }
+                       return null;
+               }
+
+               @Override
+               public String toString() {
+                       String str = getClass().getSimpleName() + "<ColId: " + 
_encoder._colID + ">";
+                       if(_blk != -1)
+                               str+= "<Sr: " + _startRow + ">";
+                       return str;
+               }
+
+       }
+
        private static class RecodePartialBuildTask implements Callable<Object> 
{
 
                private final FrameBlock _input;
@@ -332,11 +387,15 @@ public class ColumnEncoderRecode extends ColumnEncoder {
 
                @Override
                public HashMap<String, Long> call() throws Exception {
+                       long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                        HashMap<String, Long> partialMap = new HashMap<>();
                        makeRcdMap(_input, partialMap, _colID, _startRow, 
_blockSize);
                        synchronized(_partialMaps) {
                                _partialMaps.put(_startRow, partialMap);
                        }
+                       if(DMLScript.STATISTICS){
+                               
Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0);
+                       }
                        return null;
                }
 
@@ -358,6 +417,7 @@ public class ColumnEncoderRecode extends ColumnEncoder {
 
                @Override
                public Object call() throws Exception {
+                       long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                        HashMap<String, Long> rcdMap = _encoder.getRcdMap();
                        _partialMaps.forEach((start_row, map) -> {
                                ((HashMap<?, ?>) map).forEach((k, v) -> {
@@ -366,6 +426,9 @@ public class ColumnEncoderRecode extends ColumnEncoder {
                                });
                        });
                        _encoder._rcdMap = rcdMap;
+                       if(DMLScript.STATISTICS){
+                               
Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0);
+                       }
                        return null;
                }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java
index 4b48d2a..012379a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java
@@ -19,16 +19,8 @@
 
 package org.apache.sysds.runtime.transform.encode;
 
-import static org.apache.sysds.runtime.util.CollectionUtils.except;
-import static org.apache.sysds.runtime.util.CollectionUtils.unionDistinct;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map.Entry;
-
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
@@ -36,9 +28,19 @@ import org.apache.sysds.runtime.transform.TfUtils.TfMethod;
 import org.apache.sysds.runtime.transform.encode.ColumnEncoder.EncoderType;
 import org.apache.sysds.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
 import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONObject;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import static org.apache.sysds.runtime.util.CollectionUtils.except;
+import static org.apache.sysds.runtime.util.CollectionUtils.unionDistinct;
+
 public class EncoderFactory {
 
        public static MultiColumnEncoder createEncoder(String spec, String[] 
colnames, int clen, FrameBlock meta) {
@@ -125,16 +127,22 @@ public class EncoderFactory {
                                }
                        // create composite decoder of all created encoders
                        for(Entry<Integer, List<ColumnEncoder>> listEntry : 
colEncoders.entrySet()) {
+                               if(DMLScript.STATISTICS)
+                                       
Statistics.incTransformEncoderCount(listEntry.getValue().size());
                                lencoders.add(new 
ColumnEncoderComposite(listEntry.getValue()));
                        }
                        encoder = new MultiColumnEncoder(lencoders);
                        if(!oIDs.isEmpty()) {
                                encoder.addReplaceLegacyEncoder(new 
EncoderOmit(jSpec, colnames, schema.length, minCol, maxCol));
+                               if(DMLScript.STATISTICS)
+                                       Statistics.incTransformEncoderCount(1);
                        }
                        if(!mvIDs.isEmpty()) {
                                EncoderMVImpute ma = new EncoderMVImpute(jSpec, 
colnames, schema.length, minCol, maxCol);
                                ma.initRecodeIDList(rcIDs);
                                encoder.addReplaceLegacyEncoder(ma);
+                               if(DMLScript.STATISTICS)
+                                       Statistics.incTransformEncoderCount(1);
                        }
 
                        // initialize meta data w/ robustness for superset of 
cols
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java
index cda6b2a..f77e690 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java
@@ -19,21 +19,8 @@
 
 package org.apache.sysds.runtime.transform.encode;
 
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.functionobjects.KahanPlus;
 import org.apache.sysds.runtime.functionobjects.Mean;
@@ -44,10 +31,25 @@ import org.apache.sysds.runtime.transform.TfUtils.TfMethod;
 import org.apache.sysds.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysds.runtime.util.IndexRange;
 import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
 import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 public class EncoderMVImpute extends LegacyEncoder {
        private static final long serialVersionUID = 9057868620144662194L;
        // objects required to compute mean and variance of all non-missing 
entries
@@ -173,6 +175,7 @@ public class EncoderMVImpute extends LegacyEncoder {
 
        @Override
        public void build(FrameBlock in) {
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                try {
                        for(int j = 0; j < _colList.length; j++) {
                                int colID = _colList[j];
@@ -215,10 +218,13 @@ public class EncoderMVImpute extends LegacyEncoder {
                catch(Exception ex) {
                        throw new RuntimeException(ex);
                }
+               if(DMLScript.STATISTICS)
+                       
Statistics.incTransformImputeBuildTime(System.nanoTime()-t0);
        }
 
        @Override
        public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                for(int i = 0; i < in.getNumRows(); i++) {
                        for(int j = 0; j < _colList.length; j++) {
                                int colID = _colList[j];
@@ -226,6 +232,8 @@ public class EncoderMVImpute extends LegacyEncoder {
                                        out.quickSetValue(i, colID - 1, 
Double.parseDouble(_replacementList[j]));
                        }
                }
+               if(DMLScript.STATISTICS)
+                       
Statistics.incTransformImputeApplyTime(System.nanoTime()-t0);
                return out;
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java
index db61fc1..c2f3b68 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java
@@ -19,16 +19,9 @@
 
 package org.apache.sysds.runtime.transform.encode;
 
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -37,9 +30,18 @@ import org.apache.sysds.runtime.transform.TfUtils.TfMethod;
 import org.apache.sysds.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysds.runtime.util.IndexRange;
 import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
 import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
 public class EncoderOmit extends LegacyEncoder {
        /*
         * THIS CLASS IS ONLY FOR LEGACY SUPPORT!!! and will be fazed out 
slowly.
@@ -126,6 +128,7 @@ public class EncoderOmit extends LegacyEncoder {
 
        public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
                // local rmRows for broadcasting encoder in spark
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                boolean[] rmRows;
                if(_federated)
                        rmRows = _rmRows;
@@ -148,7 +151,8 @@ public class EncoderOmit extends LegacyEncoder {
                }
 
                _rmRows = rmRows;
-
+               if(DMLScript.STATISTICS)
+                       
Statistics.incTransformOmitApplyTime(System.nanoTime()-t0);
                return ret;
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index 6db63f5..73d33a9 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
@@ -36,22 +38,31 @@ import java.util.stream.Collectors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.SparseBlockMCSR;
+import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.DependencyTask;
 import org.apache.sysds.runtime.util.DependencyThreadPool;
 import org.apache.sysds.runtime.util.DependencyWrapperTask;
 import org.apache.sysds.runtime.util.IndexRange;
+import org.apache.sysds.utils.Statistics;
 
 public class MultiColumnEncoder implements Encoder {
 
        protected static final Log LOG = 
LogFactory.getLog(MultiColumnEncoder.class.getName());
        private static final boolean MULTI_THREADED = true;
-       public static boolean MULTI_THREADED_STAGES = true;
+       // If true build and apply separately by placing a synchronization 
barrier
+       public static boolean MULTI_THREADED_STAGES = false;  
+
+       // Only affects if  MULTI_THREADED_STAGES is true
+       // if true apply tasks for each column will complete
+       // before the next will start.
+       public static boolean APPLY_ENCODER_SEPARATE_STAGES = false; 
 
        private List<ColumnEncoderComposite> _columnEncoders;
        // These encoders are deprecated and will be phased out soon.
@@ -60,18 +71,6 @@ public class MultiColumnEncoder implements Encoder {
        private int _colOffset = 0; // offset for federated Workers who are 
using subrange encoders
        private FrameBlock _meta = null;
 
-       // TEMP CONSTANTS for testing only
-       //private int APPLY_BLOCKSIZE = 0; // temp only for testing until 
automatic calculation of block size
-       public static int BUILD_BLOCKSIZE = 0;
-
-       /*public void setApplyBlockSize(int blk) {
-               APPLY_BLOCKSIZE = blk;
-       }*/
-
-       public void setBuildBlockSize(int blk) {
-               BUILD_BLOCKSIZE = blk;
-       }
-
        public MultiColumnEncoder(List<ColumnEncoderComposite> columnEncoders) {
                _columnEncoders = columnEncoders;
        }
@@ -90,6 +89,7 @@ public class MultiColumnEncoder implements Encoder {
                        if(MULTI_THREADED && k > 1 && !MULTI_THREADED_STAGES && 
!hasLegacyEncoder()) {
                                out = new MatrixBlock();
                                DependencyThreadPool pool = new 
DependencyThreadPool(k);
+                               LOG.debug("Encoding with full DAG on " + k + " 
Threads");
                                try {
                                        
pool.submitAllAndWait(getEncodeTasks(in, out, pool));
                                }
@@ -98,10 +98,10 @@ public class MultiColumnEncoder implements Encoder {
                                        e.printStackTrace();
                                }
                                pool.shutdown();
-                               out.recomputeNonZeros();
-                               return out;
+                               outputMatrixPostProcessing(out);
                        }
                        else {
+                               LOG.debug("Encoding with staged approach on: " 
+ k + " Threads");
                                build(in, k);
                                if(_legacyMVImpute != null) {
                                        // These operations are redundant for 
every encoder excluding the legacyMVImpute, the workaround to
@@ -127,9 +127,10 @@ public class MultiColumnEncoder implements Encoder {
                List<DependencyTask<?>> applyTAgg = null;
                Map<Integer[], Integer[]> depMap = new HashMap<>();
                boolean hasDC = 
getColumnEncoders(ColumnEncoderDummycode.class).size() > 0;
+               boolean applyOffsetDep = false;
                tasks.add(DependencyThreadPool.createDependencyTask(new 
InitOutputMatrixTask(this, in, out)));
                for(ColumnEncoderComposite e : _columnEncoders) {
-                       List<DependencyTask<?>> buildTasks = 
e.getBuildTasks(in, BUILD_BLOCKSIZE);
+                       List<DependencyTask<?>> buildTasks = 
e.getBuildTasks(in);
 
                        tasks.addAll(buildTasks);
                        if(buildTasks.size() > 0) {
@@ -152,11 +153,14 @@ public class MultiColumnEncoder implements Encoder {
                                // colUpdateTask can start when all domain 
sizes, because it can now calculate the offsets for
                                // each column
                                depMap.put(new Integer[] {-2, -1}, new 
Integer[] {tasks.size() - 1, tasks.size()});
+                               buildTasks.forEach(t -> t.setPriority(5));
+                               applyOffsetDep = true;
                        }
 
-                       if(hasDC) {
+                       if(hasDC && applyOffsetDep) {
                                // Apply Task dependency to output col update 
task (is last in list)
-                               // All ApplyTasks need to wait for this task so 
they all have the correct offsets.
+                               // All ApplyTasks need to wait for this task, 
so they all have the correct offsets.
+                               // But only for the columns that come after the 
first DC coder since they don't have an offset
                                depMap.put(new Integer[] {tasks.size(), 
tasks.size() + 1}, new Integer[] {-2, -1});
 
                                applyTAgg = applyTAgg == null ? new 
ArrayList<>() : applyTAgg;
@@ -195,7 +199,7 @@ public class MultiColumnEncoder implements Encoder {
        private List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
                List<DependencyTask<?>> tasks = new ArrayList<>();
                for(ColumnEncoderComposite columnEncoder : _columnEncoders) {
-                       tasks.addAll(columnEncoder.getBuildTasks(in, 
BUILD_BLOCKSIZE));
+                       tasks.addAll(columnEncoder.getBuildTasks(in));
                }
                return tasks;
        }
@@ -241,23 +245,23 @@ public class MultiColumnEncoder implements Encoder {
                if(in.getNumColumns() != numEncoders)
                        throw new DMLRuntimeException("Not every column in has 
a CompositeEncoder. Please make sure every column "
                                + "has a encoder or slice the input 
accordingly");
-               // Block allocation for MT access
-               outputMatrixPreProcessing(out, in);
                // TODO smart checks
                if(MULTI_THREADED && k > 1) {
+                       // Block allocation for MT access
+                       outputMatrixPreProcessing(out, in);
                        applyMT(in, out, outputCol, k);
                }
                else {
                        int offset = outputCol;
                        for(ColumnEncoderComposite columnEncoder : 
_columnEncoders) {
                                columnEncoder.apply(in, out, 
columnEncoder._colID - 1 + offset);
-                               
if(columnEncoder.hasEncoder(ColumnEncoderDummycode.class))
+                               if 
(columnEncoder.hasEncoder(ColumnEncoderDummycode.class))
                                        offset += 
columnEncoder.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
                        }
                }
                // Recomputing NNZ since we access the block directly
                // TODO set NNZ explicit count them in the encoders
-               out.recomputeNonZeros();
+               outputMatrixPostProcessing(out);
                if(_legacyOmit != null)
                        out = _legacyOmit.apply(in, out);
                if(_legacyMVImpute != null)
@@ -280,16 +284,26 @@ public class MultiColumnEncoder implements Encoder {
        private void applyMT(FrameBlock in, MatrixBlock out, int outputCol, int 
k) {
                DependencyThreadPool pool = new DependencyThreadPool(k);
                try {
-                       pool.submitAllAndWait(getApplyTasks(in, out, 
outputCol));
+                       if(APPLY_ENCODER_SEPARATE_STAGES){
+                               int offset = outputCol;
+                               for (ColumnEncoderComposite e : 
_columnEncoders) {
+                                       
pool.submitAllAndWait(e.getApplyTasks(in, out, e._colID - 1 + offset));
+                                       if 
(e.hasEncoder(ColumnEncoderDummycode.class))
+                                               offset += 
e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
+                               }
+                       }else{
+                               pool.submitAllAndWait(getApplyTasks(in, out, 
outputCol));
+                       }
                }
                catch(ExecutionException | InterruptedException e) {
-                       LOG.error("MT Column encode failed");
+                       LOG.error("MT Column apply failed");
                        e.printStackTrace();
                }
                pool.shutdown();
        }
 
        private static void outputMatrixPreProcessing(MatrixBlock output, 
FrameBlock input) {
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                output.allocateBlock();
                if(output.isInSparseFormat()) {
                        SparseBlock block = output.getSparseBlock();
@@ -300,8 +314,33 @@ public class MultiColumnEncoder implements Encoder {
                                // allocate all sparse rows so MT sync can be 
done.
                                // should be rare that rows have only 0
                                block.allocate(r, input.getNumColumns());
+                               // Setting the size here makes it possible to 
run all sparse apply tasks without any sync
+                               // could become problematic if the input is 
very sparse since we allocate the same size as the input
+                               // should be fine in theory ;)
+                               
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+                       }
+               }
+               if(DMLScript.STATISTICS)
+                       
Statistics.incTransformOutMatrixPreProcessingTime(System.nanoTime()-t0);
+       }
+
+       private void outputMatrixPostProcessing(MatrixBlock output){
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+               Set<Integer> indexSet = 
getColumnEncoders(ColumnEncoderPassThrough.class).stream()
+                               
.map(ColumnEncoderPassThrough::getSparseRowsWZeros).flatMap(l -> {
+                                       if(l == null)
+                                               return null;
+                                       return l.stream();
+                               }).collect(Collectors.toSet());
+               if(!indexSet.stream().allMatch(Objects::isNull)){
+                       for(Integer row : indexSet){
+                               // TODO: Maybe MT in special cases when the 
number of rows is large
+                               output.getSparseBlock().get(row).compact();
                        }
                }
+               output.recomputeNonZeros();
+               if(DMLScript.STATISTICS)
+                       
Statistics.incTransformOutMatrixPostProcessingTime(System.nanoTime()-t0);
        }
 
        @Override
@@ -660,7 +699,7 @@ public class MultiColumnEncoder implements Encoder {
        }
 
        /*
-        * Currently not in use will be integrated in the future
+        * Currently, not in use will be integrated in the future
         */
        @SuppressWarnings("unused")
        private static class MultiColumnLegacyBuildTask implements 
Callable<Object> {
diff --git a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java 
b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
index 5aff39b..17351a6 100644
--- a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
+++ b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
@@ -25,16 +25,20 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.DMLRuntimeException;
 
-public class DependencyTask<E> implements Callable<E> {
+public class DependencyTask<E> implements Comparable<DependencyTask<?>>, 
Callable<E> {
        public static final boolean ENABLE_DEBUG_DATA = false;
+       protected static final Log LOG = 
LogFactory.getLog(DependencyTask.class.getName());
 
        private final Callable<E> _task;
        protected final List<DependencyTask<?>> _dependantTasks;
        public List<DependencyTask<?>> _dependencyTasks = null; // only for 
debugging
        private CompletableFuture<Future<?>> _future;
        private int _rdy = 0;
+       private Integer _priority = 0;
        private ExecutorService _pool;
 
        public DependencyTask(Callable<E> task, List<DependencyTask<?>> 
dependantTasks) {
@@ -54,6 +58,10 @@ public class DependencyTask<E> implements Callable<E> {
                return _rdy == 0;
        }
 
+       public void setPriority(int priority) {
+               _priority = priority;
+       }
+
        private boolean decrease() {
                synchronized(this) {
                        _rdy -= 1;
@@ -68,7 +76,11 @@ public class DependencyTask<E> implements Callable<E> {
 
        @Override
        public E call() throws Exception {
+               LOG.debug("Executing Task: " + this);
+               long t0 = System.nanoTime();
                E ret = _task.call();
+               LOG.debug("Finished Task: " + this + " in: " +
+                               (String.format("%.3f", 
(System.nanoTime()-t0)*1e-9)) + "sec.");
                _dependantTasks.forEach(t -> {
                        if(t.decrease()) {
                                if(_pool == null)
@@ -79,4 +91,14 @@ public class DependencyTask<E> implements Callable<E> {
 
                return ret;
        }
+
+       @Override
+       public String toString(){
+               return _task.toString() + "<Prio: " + _priority + ">" + 
"<Waiting: " + _dependantTasks.size() + ">";
+       }
+
+       @Override
+       public int compareTo(DependencyTask<?> task) {
+               return -1 * this._priority.compareTo(task._priority);
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java 
b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
index 4fdd63a..50675d6 100644
--- a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
+++ b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
@@ -19,7 +19,12 @@
 
 package org.apache.sysds.runtime.util;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.DMLRuntimeException;
+
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,11 +36,10 @@ import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import org.apache.sysds.runtime.DMLRuntimeException;
-
 
 public class DependencyThreadPool {
 
+       protected static final Log LOG = 
LogFactory.getLog(DependencyThreadPool.class.getName());
        private final ExecutorService _pool;
 
        public DependencyThreadPool(int k) {
@@ -50,6 +54,8 @@ public class DependencyThreadPool {
                List<Future<Future<?>>> futures = new ArrayList<>();
                List<Integer> rdyTasks = new ArrayList<>();
                int i = 0;
+               // sort by priority
+               Collections.sort(dtasks);
                for(DependencyTask<?> t : dtasks) {
                        CompletableFuture<Future<?>> f = new 
CompletableFuture<>();
                        t.addPool(_pool);
@@ -63,6 +69,8 @@ public class DependencyThreadPool {
                        futures.add(f);
                        i++;
                }
+               LOG.debug("Initial Starting tasks: \n\t" +
+                               rdyTasks.stream().map(index -> 
dtasks.get(index).toString()).collect(Collectors.joining("\n\t")));
                // Two stages to avoid race condition!
                for(Integer index : rdyTasks) {
                        synchronized(_pool) {
diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
index de6d7d6..7431a82 100644
--- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
@@ -989,6 +989,15 @@ public class UtilFunctions {
                return (blockSize <= 0)? arrayLength: Math.min(arrayLength, 
startIndex + blockSize);
        }
 
+       public static int[] getBlockSizes(int num, int numBlocks){
+               int[] blockSizes = new int[numBlocks];
+               Arrays.fill(blockSizes, num/numBlocks);
+               for (int i = 0; i < num%numBlocks; i++){
+                       blockSizes[i]++;
+               }
+               return blockSizes;
+       }
+
        public static String[] splitRecodeEntry(String s) {
                //forward to column encoder, as UtilFunctions available in map 
context
                return ColumnEncoderRecode.splitRecodeMapEntry(s);
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java 
b/src/main/java/org/apache/sysds/utils/Statistics.java
index cacd2f7..d91d9c5 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -19,20 +19,6 @@
 
 package org.apache.sysds.utils;
 
-import java.lang.management.CompilationMXBean;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.text.DecimalFormat;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.DoubleAdder;
-import java.util.concurrent.atomic.LongAdder;
-
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.sysds.api.DMLScript;
@@ -51,6 +37,20 @@ import 
org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 import org.apache.sysds.runtime.lineage.LineageCacheStatistics;
 import org.apache.sysds.runtime.privacy.CheckedConstraintsLog;
 
+import java.lang.management.CompilationMXBean;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.text.DecimalFormat;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.DoubleAdder;
+import java.util.concurrent.atomic.LongAdder;
+
 /**
  * This class captures all statistics.
  */
@@ -149,7 +149,28 @@ public class Statistics
        private static final LongAdder lTotalUIPVar = new LongAdder();
        private static final LongAdder lTotalLix = new LongAdder();
        private static final LongAdder lTotalLixUIP = new LongAdder();
-       
+
+       // Transformencode stats
+       private static final LongAdder transformEncoderCount = new LongAdder();
+
+       //private static final LongAdder transformBuildTime = new LongAdder();
+       private static final LongAdder transformRecodeBuildTime = new 
LongAdder();
+       private static final LongAdder transformBinningBuildTime = new 
LongAdder();
+       private static final LongAdder transformImputeBuildTime = new 
LongAdder();
+
+       //private static final LongAdder transformApplyTime = new LongAdder();
+       private static final LongAdder transformRecodeApplyTime = new 
LongAdder();
+       private static final LongAdder transformDummyCodeApplyTime = new 
LongAdder();
+       private static final LongAdder transformPassThroughApplyTime = new 
LongAdder();
+       private static final LongAdder transformFeatureHashingApplyTime = new 
LongAdder();
+       private static final LongAdder transformBinningApplyTime = new 
LongAdder();
+       private static final LongAdder transformOmitApplyTime = new LongAdder();
+       private static final LongAdder transformImputeApplyTime = new 
LongAdder();
+
+
+       private static final LongAdder transformOutMatrixPreProcessingTime = 
new LongAdder();
+       private static final LongAdder transformOutMatrixPostProcessingTime = 
new LongAdder();
+
        // Federated stats
        private static final LongAdder federatedReadCount = new LongAdder();
        private static final LongAdder federatedPutCount = new LongAdder();
@@ -649,6 +670,70 @@ public class Statistics
 
        public static void accFedPSCommunicationTime(long t) { 
fedPSCommunicationTime.add(t);}
 
+       public static void incTransformEncoderCount(long encoders){
+               transformEncoderCount.add(encoders);
+       }
+
+       public static void incTransformRecodeApplyTime(long t){
+               transformRecodeApplyTime.add(t);
+       }
+
+       public static void incTransformDummyCodeApplyTime(long t){
+               transformDummyCodeApplyTime.add(t);
+       }
+
+       public static void incTransformBinningApplyTime(long t){
+               transformBinningApplyTime.add(t);
+       }
+
+       public static void incTransformPassThroughApplyTime(long t){
+               transformPassThroughApplyTime.add(t);
+       }
+
+       public static void incTransformFeatureHashingApplyTime(long t){
+               transformFeatureHashingApplyTime.add(t);
+       }
+
+       public static void incTransformOmitApplyTime(long t) {
+               transformOmitApplyTime.add(t);
+       }
+
+       public static void incTransformImputeApplyTime(long t) {
+               transformImputeApplyTime.add(t);
+       }
+
+       public static void incTransformRecodeBuildTime(long t){
+               transformRecodeBuildTime.add(t);
+       }
+
+       public static void incTransformBinningBuildTime(long t){
+               transformBinningBuildTime.add(t);
+       }
+
+       public static void incTransformImputeBuildTime(long t) {
+               transformImputeBuildTime.add(t);
+       }
+
+       public static void incTransformOutMatrixPreProcessingTime(long t){
+               transformOutMatrixPreProcessingTime.add(t);
+       }
+
+       public static void incTransformOutMatrixPostProcessingTime(long t){
+               transformOutMatrixPostProcessingTime.add(t);
+       }
+
+       public static long getTransformEncodeBuildTime(){
+               return transformBinningBuildTime.longValue() + 
transformImputeBuildTime.longValue() +
+                               transformRecodeBuildTime.longValue();
+       }
+
+       public static long getTransformEncodeApplyTime(){
+               return transformDummyCodeApplyTime.longValue() + 
transformBinningApplyTime.longValue() +
+                               transformFeatureHashingApplyTime.longValue() + 
transformPassThroughApplyTime.longValue() +
+                               transformRecodeApplyTime.longValue() + 
transformOmitApplyTime.longValue() +
+                               transformImputeApplyTime.longValue();
+       }
+
        public static String getCPHeavyHitterCode( Instruction inst )
        {
                String opcode = null;
@@ -1129,6 +1214,50 @@ public class Statistics
                                        
federatedExecuteInstructionCount.longValue() + "/" +
                                        federatedExecuteUDFCount.longValue() + 
".\n");
                        }
+                       if( transformEncoderCount.longValue() > 0) {
+                               //TODO: Cleanup and condense
+                               sb.append("TransformEncode num. 
encoders:\t").append(transformEncoderCount.longValue()).append("\n");
+                               sb.append("TransformEncode build 
time:\t").append(String.format("%.3f",
+                                               
getTransformEncodeBuildTime()*1e-9)).append(" sec.\n");
+                               if(transformRecodeBuildTime.longValue() > 0)
+                                       sb.append("\tRecode build 
time:\t").append(String.format("%.3f",
+                                                       
transformRecodeBuildTime.longValue()*1e-9)).append(" sec.\n");
+                               if(transformBinningBuildTime.longValue() > 0)
+                                       sb.append("\tBinning build 
time:\t").append(String.format("%.3f",
+                                                       
transformBinningBuildTime.longValue()*1e-9)).append(" sec.\n");
+                               if(transformImputeBuildTime.longValue() > 0)
+                                       sb.append("\tImpute build 
time:\t").append(String.format("%.3f",
+                                                       
transformImputeBuildTime.longValue()*1e-9)).append(" sec.\n");
+
+                               sb.append("TransformEncode apply 
time:\t").append(String.format("%.3f",
+                                               
getTransformEncodeApplyTime()*1e-9)).append(" sec.\n");
+                               if(transformRecodeApplyTime.longValue() > 0)
+                                       sb.append("\tRecode apply 
time:\t").append(String.format("%.3f",
+                                                       
transformRecodeApplyTime.longValue()*1e-9)).append(" sec.\n");
+                               if(transformBinningApplyTime.longValue() > 0)
+                                       sb.append("\tBinning apply 
time:\t").append(String.format("%.3f",
+                                                       
transformBinningApplyTime.longValue()*1e-9)).append(" sec.\n");
+                               if(transformDummyCodeApplyTime.longValue() > 0)
+                                       sb.append("\tDummyCode apply 
time:\t").append(String.format("%.3f",
+                                                       
transformDummyCodeApplyTime.longValue()*1e-9)).append(" sec.\n");
+                               if(transformFeatureHashingApplyTime.longValue() 
> 0)
+                                       sb.append("\tHashing apply 
time:\t").append(String.format("%.3f",
+                                                       
transformFeatureHashingApplyTime.longValue()*1e-9)).append(" sec.\n");
+                               if(transformPassThroughApplyTime.longValue() > 
0)
+                                       sb.append("\tPassThrough apply 
time:\t").append(String.format("%.3f",
+                                                       
transformPassThroughApplyTime.longValue()*1e-9)).append(" sec.\n");
+                               if(transformOmitApplyTime.longValue() > 0)
+                                       sb.append("\tOmit apply 
time:\t").append(String.format("%.3f",
+                                                       
transformOmitApplyTime.longValue()*1e-9)).append(" sec.\n");
+                               if(transformImputeApplyTime.longValue() > 0)
+                                       sb.append("\tImpute apply 
time:\t").append(String.format("%.3f",
+                                                       
transformImputeApplyTime.longValue()*1e-9)).append(" sec.\n");
+
+                               sb.append("TransformEncode PreProc. 
time:\t").append(String.format("%.3f",
+                                               
transformOutMatrixPreProcessingTime.longValue()*1e-9)).append(" sec.\n");
+                               sb.append("TransformEncode PostProc. 
time:\t").append(String.format("%.3f",
+                                               
transformOutMatrixPostProcessingTime.longValue()*1e-9)).append(" sec.\n");
+                       }
 
                        if(ConfigurationManager.isCompressionEnabled()){
                                DMLCompressionStatistics.display(sb);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java
 
b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java
index 8824b9d..b70571b 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java
@@ -30,6 +30,7 @@ import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
 import org.apache.sysds.runtime.io.FrameReaderFactory;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.transform.encode.ColumnEncoder;
 import org.apache.sysds.runtime.transform.encode.ColumnEncoderBin;
 import org.apache.sysds.runtime.transform.encode.ColumnEncoderRecode;
 import org.apache.sysds.runtime.transform.encode.EncoderFactory;
@@ -173,6 +174,7 @@ public class TransformFrameBuildMultithreadedTest extends 
AutomatedTestBase {
                                .readFrameFromHDFS(DATASET, -1L, -1L);
                        StringBuilder specSb = new StringBuilder();
                        Files.readAllLines(Paths.get(SPEC)).forEach(s -> 
specSb.append(s).append("\n"));
+                       ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN = 
Math.max(blockSize, 1);
                        MultiColumnEncoder encoderS = 
EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
                                input.getNumColumns(), null);
                        MultiColumnEncoder encoderM = 
EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
diff --git 
a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java
 
b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java
index 2156250..6679f36 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java
@@ -48,7 +48,7 @@ public class TransformFrameEncodeMultithreadedTest extends 
AutomatedTestBase {
        private final static String DATASET1 = "homes3/homes.csv";
        private final static String SPEC1 = "homes3/homes.tfspec_recode.json";
        private final static String SPEC2 = "homes3/homes.tfspec_dummy.json";
-       private final static String SPEC2all = 
"homes3/homes.tfspec_dummy_all.json";
+       private final static String SPEC2sparse = 
"homes3/homes.tfspec_dummy_sparse.json";
        private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; // 
recode
        private final static String SPEC6 = 
"homes3/homes.tfspec_recode_dummy.json";
        private final static String SPEC7 = 
"homes3/homes.tfspec_binDummy.json"; // recode+dummy
@@ -164,7 +164,7 @@ public class TransformFrameEncodeMultithreadedTest extends 
AutomatedTestBase {
                                DATASET = DATASET1;
                                break;
                        case DUMMY_ALL:
-                               SPEC = SPEC2all;
+                               SPEC = SPEC2sparse;
                                DATASET = DATASET1;
                                break;
                        case BIN:
diff --git a/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json 
b/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json
deleted file mode 100644
index 65b8fee..0000000
--- a/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json
+++ /dev/null
@@ -1 +0,0 @@
-{"ids": true, "dummycode": [ 2, 7, 1, 3, 4, 5, 6, 8, 9 ] }
\ No newline at end of file
diff --git a/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json 
b/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json
new file mode 100644
index 0000000..ed48308
--- /dev/null
+++ b/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json
@@ -0,0 +1 @@
+{"ids": true, "dummycode": [ 2, 7, 1, 3, 4, 6, 8, 9 ] }
\ No newline at end of file

Reply via email to