This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new dd11429  [SYSTEMDS-3222] Multi-threaded metadata collection for 
transformencode
dd11429 is described below

commit dd11429686281defc2987433085b96f3d89abad8
Author: arnabp <[email protected]>
AuthorDate: Mon Nov 15 22:20:01 2021 +0100

    [SYSTEMDS-3222] Multi-threaded metadata collection for transformencode
    
    This patch adds the initial multi-threaded getMetaData(). getMetaData
    is not part of the transformencode task-graph for now.
    Additionally, this patch fixes some typos.
---
 .../runtime/compress/CompressedMatrixBlock.java    |  4 +-
 ...ltiReturnParameterizedBuiltinCPInstruction.java |  3 +-
 .../sysds/runtime/matrix/data/MatrixBlock.java     | 10 ++--
 .../runtime/transform/encode/ColumnEncoderBin.java |  5 ++
 .../transform/encode/ColumnEncoderComposite.java   |  8 +++
 .../transform/encode/ColumnEncoderDummycode.java   |  5 ++
 .../transform/encode/ColumnEncoderFeatureHash.java |  6 ++
 .../transform/encode/ColumnEncoderPassThrough.java |  6 ++
 .../transform/encode/ColumnEncoderRecode.java      | 20 +++++++
 .../sysds/runtime/transform/encode/Encoder.java    |  6 ++
 .../transform/encode/MultiColumnEncoder.java       | 64 +++++++++++++++++++++-
 .../TransformFrameEncodeMultithreadedTest.java     |  2 +-
 12 files changed, 128 insertions(+), 11 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index 162d919..654be34 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -1076,9 +1076,9 @@ public class CompressedMatrixBlock extends MatrixBlock {
                }
 
                if(m2 instanceof CompressedMatrixBlock)
-                       m2 = ((CompressedMatrixBlock) 
m2).getUncompressed("Ternay Operator arg2 " + op.fn.getClass().getSimpleName());
+                       m2 = ((CompressedMatrixBlock) 
m2).getUncompressed("Ternary Operator arg2 " + 
op.fn.getClass().getSimpleName());
                if(m3 instanceof CompressedMatrixBlock)
-                       m3 = ((CompressedMatrixBlock) 
m3).getUncompressed("Ternay Operator arg3 " + op.fn.getClass().getSimpleName());
+                       m3 = ((CompressedMatrixBlock) 
m3).getUncompressed("Ternary Operator arg3 " + 
op.fn.getClass().getSimpleName());
 
                if(s2 != s3 && (op.fn instanceof PlusMultiply || op.fn 
instanceof MinusMultiply)) {
                        // SPECIAL CASE for sparse-dense combinations of common 
+* and -*
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
index 14dbc3a..eda8385 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
@@ -88,7 +88,8 @@ public class MultiReturnParameterizedBuiltinCPInstruction 
extends ComputationCPI
                MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec, 
colnames, fin.getNumColumns(), null);
                // TODO: Assign #threads in compiler and pass via the 
instruction string
                MatrixBlock data = encoder.encode(fin, 
OptimizerUtils.getTransformNumThreads()); // build and apply
-               FrameBlock meta = encoder.getMetaData(new 
FrameBlock(fin.getNumColumns(), ValueType.STRING));
+               FrameBlock meta = encoder.getMetaData(new 
FrameBlock(fin.getNumColumns(), ValueType.STRING), 
+                               OptimizerUtils.getTransformNumThreads());
                meta.setColumnNames(colnames);
 
                // release input and outputs
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 2f521b5..d77e7af 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -3032,10 +3032,10 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
 
                        if(m2 instanceof CompressedMatrixBlock)
                                m2 = ((CompressedMatrixBlock) m2)
-                                       .getUncompressed("Ternay Operator arg2 
" + op.fn.getClass().getSimpleName());
+                                       .getUncompressed("Ternary Operator arg2 
" + op.fn.getClass().getSimpleName());
                        if(m3 instanceof CompressedMatrixBlock)
                                m3 = ((CompressedMatrixBlock) m3)
-                                       .getUncompressed("Ternay Operator arg3 
" + op.fn.getClass().getSimpleName());
+                                       .getUncompressed("Ternary Operator arg3 
" + op.fn.getClass().getSimpleName());
 
                        ret.reset(m, n, sparseOutput);
 
@@ -5102,11 +5102,11 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
        public MatrixBlock aggregateTernaryOperations(MatrixBlock m1, 
MatrixBlock m2, MatrixBlock m3, MatrixBlock ret,
                        AggregateTernaryOperator op, boolean inCP) {
                if(m1 instanceof CompressedMatrixBlock)
-                       m1 = ((CompressedMatrixBlock) 
m1).getUncompressed("Aggregate Ternay Operator arg1 " + 
op.getClass().getSimpleName());
+                       m1 = ((CompressedMatrixBlock) 
m1).getUncompressed("Aggregate Ternary Operator arg1 " + 
op.getClass().getSimpleName());
                if(m2 instanceof CompressedMatrixBlock)
-                       m2 = ((CompressedMatrixBlock) 
m2).getUncompressed("Aggregate Ternay Operator arg2 " + 
op.getClass().getSimpleName());
+                       m2 = ((CompressedMatrixBlock) 
m2).getUncompressed("Aggregate Ternary Operator arg2 " + 
op.getClass().getSimpleName());
                if(m3 instanceof CompressedMatrixBlock)
-                       m3 = ((CompressedMatrixBlock) 
m3).getUncompressed("Aggregate Ternay Operator arg3 " + 
op.getClass().getSimpleName());
+                       m3 = ((CompressedMatrixBlock) 
m3).getUncompressed("Aggregate Ternary Operator arg3 " + 
op.getClass().getSimpleName());
 
                //create output matrix block w/ corrections
                int rl = (op.indexFn instanceof ReduceRow) ? 2 : 1;
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
index a201448..ab9f662 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
@@ -205,6 +205,11 @@ public class ColumnEncoderBin extends ColumnEncoder {
        }
 
        @Override
+       public void allocateMetaData(FrameBlock meta) {
+               meta.ensureAllocatedColumns(_binMaxs.length);
+       }
+
+       @Override
        public FrameBlock getMetaData(FrameBlock meta) {
                // allocate frame if necessary
                meta.ensureAllocatedColumns(_binMaxs.length);
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
index 4384bae..1f5fce6 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
@@ -269,6 +269,14 @@ public class ColumnEncoderComposite extends ColumnEncoder {
        }
 
        @Override
+       public void allocateMetaData(FrameBlock meta) {
+               if(_meta != null)
+                       return;
+               for(ColumnEncoder columnEncoder : _columnEncoders)
+                       columnEncoder.allocateMetaData(meta);
+       }
+
+       @Override
        public FrameBlock getMetaData(FrameBlock out) {
                if(_meta != null)
                        return _meta;
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
index d8fcd47..1ea4933 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
@@ -189,6 +189,11 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
        }
 
        @Override
+       public void allocateMetaData(FrameBlock meta) {
+               return;
+       }
+
+       @Override
        public FrameBlock getMetaData(FrameBlock meta) {
                return meta;
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
index 013deac..922b5a8 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
@@ -99,6 +99,12 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder {
        }
 
        @Override
+       public void allocateMetaData(FrameBlock meta) {
+               if (isApplicable())
+                       meta.ensureAllocatedColumns(1);
+       }
+
+       @Override
        public FrameBlock getMetaData(FrameBlock meta) {
                if(!isApplicable())
                        return meta;
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
index f49747e..bc20fe3 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
@@ -100,6 +100,12 @@ public class ColumnEncoderPassThrough extends 
ColumnEncoder {
        }
 
        @Override
+       public void allocateMetaData(FrameBlock meta) {
+               // do nothing
+               return;
+       }
+
+       @Override
        public FrameBlock getMetaData(FrameBlock meta) {
                // do nothing
                return meta;
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
index 2371789..8246f97 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
@@ -178,6 +178,20 @@ public class ColumnEncoderRecode extends ColumnEncoder {
                long code = lookupRCDMap(key);
                return (code < 0) ? Double.NaN : code;
        }
+       
+       protected double[] getCodeCol(CacheBlock in) {
+               Object[] coldata = (Object[]) 
((FrameBlock)in).getColumnData(_colID-1);
+               double codes[] = new double[in.getNumRows()];
+               for (int i=0; i<coldata.length; i++) {
+                       Object okey = coldata[i]; 
+                       String key = (okey != null) ? okey.toString() : null;
+                       if(key == null || key.isEmpty())
+                               codes[i] = Double.NaN;
+                       long code = lookupRCDMap(key);
+                       codes[i] = code;
+               }
+               return codes;
+       }
 
        @Override
        public void prepareBuildPartial() {
@@ -231,6 +245,12 @@ public class ColumnEncoderRecode extends ColumnEncoder {
        public int getNumDistinctValues() {
                return _rcdMap.size();
        }
+       
+       @Override
+       public void allocateMetaData(FrameBlock meta) {
+               // allocate output rows
+               meta.ensureAllocatedColumns(getNumDistinctValues());
+       }
 
        @Override
        public FrameBlock getMetaData(FrameBlock meta) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/Encoder.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/Encoder.java
index 3446fb1..075919e 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/Encoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/Encoder.java
@@ -47,6 +47,12 @@ public interface Encoder extends Externalizable {
         * @return output matrix block
         */
        MatrixBlock apply(CacheBlock in, MatrixBlock out, int outputCol);
+       
+       /** 
+        * Pre-allocate a FrameBlock for metadata collection.
+        * @param meta      frame block
+        */
+       void allocateMetaData(FrameBlock meta);
 
        /**
         * Construct a frame block out of the transform meta data.
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index 4f5c2af..f593487 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -32,6 +32,8 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -48,6 +50,7 @@ import org.apache.sysds.runtime.data.SparseBlockMCSR;
 import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.CommonThreadPool;
 import org.apache.sysds.runtime.util.DependencyTask;
 import org.apache.sysds.runtime.util.DependencyThreadPool;
 import org.apache.sysds.runtime.util.DependencyWrapperTask;
@@ -103,7 +106,10 @@ public class MultiColumnEncoder implements Encoder {
                        }
                        else {
                                LOG.debug("Encoding with staged approach on: " 
+ k + " Threads");
+                               long t0 = System.nanoTime();
                                build(in, k);
+                               long t1 = System.nanoTime();
+                               LOG.debug("Elapsed time for build phase: "+ 
((double) t1 - t0) / 1000000 + " ms");
                                if(_legacyMVImpute != null) {
                                        // These operations are redundant for 
every encoder excluding the legacyMVImpute, the workaround to
                                        // fix it for this encoder would be 
very dirty. This will only have a performance impact if there
@@ -113,7 +119,10 @@ public class MultiColumnEncoder implements Encoder {
                                        initMetaData(_meta);
                                }
                                // apply meta data
+                               t0 = System.nanoTime();
                                out = apply(in, k);
+                               t1 = System.nanoTime();
+                               LOG.debug("Elapsed time for apply phase: "+ 
((double) t1 - t0) / 1000000 + " ms");
                        }
                }
                catch(Exception ex) {
@@ -351,15 +360,50 @@ public class MultiColumnEncoder implements Encoder {
        }
 
        @Override
+       public void allocateMetaData(FrameBlock meta) {
+               for(ColumnEncoder columnEncoder : _columnEncoders) {
+                       columnEncoder.allocateMetaData(meta);
+               }
+       }
+
+       @Override
        public FrameBlock getMetaData(FrameBlock meta) {
+               getMetaData(meta, 1);
+               return meta;
+       }
+
+       public FrameBlock getMetaData(FrameBlock meta, int k) {
+               long t0 = System.nanoTime();
                if(_meta != null)
                        return _meta;
-               for(ColumnEncoder columnEncoder : _columnEncoders)
-                       columnEncoder.getMetaData(meta);
+               this.allocateMetaData(meta);
+               if (k > 1) {
+                       try {
+                               ExecutorService pool = CommonThreadPool.get(k);
+                               ArrayList<ColumnMetaDataTask<? extends 
ColumnEncoder>> tasks = new ArrayList<>();
+                               for(ColumnEncoder columnEncoder : 
_columnEncoders)
+                                       tasks.add(new 
ColumnMetaDataTask<>(columnEncoder, meta));
+                               List<Future<Object>> taskret = 
pool.invokeAll(tasks);
+                               pool.shutdown();
+                               for (Future<Object> task : taskret)
+                                       task.get();
+                       }
+                       catch(Exception ex) {
+                               throw new DMLRuntimeException(ex);
+                       }
+               }
+               else {
+                       for(ColumnEncoder columnEncoder : _columnEncoders)
+                               columnEncoder.getMetaData(meta);
+               }
+
+               //_columnEncoders.stream().parallel().forEach(columnEncoder -> 
+               //              columnEncoder.getMetaData(meta));
                if(_legacyOmit != null)
                        _legacyOmit.getMetaData(meta);
                if(_legacyMVImpute != null)
                        _legacyMVImpute.getMetaData(meta);
+               LOG.debug("Time spent getting metadata "+((double) 
System.nanoTime() - t0) / 1000000 + " ms");
                return meta;
        }
 
@@ -852,5 +896,21 @@ public class MultiColumnEncoder implements Encoder {
                        return null;
                }
        }
+       
+       private static class ColumnMetaDataTask<T extends ColumnEncoder> 
implements Callable<Object> {
+               private final T _colEncoder;
+               private final FrameBlock _out;
+
+               protected ColumnMetaDataTask(T encoder, FrameBlock out) {
+                       _colEncoder = encoder;
+                       _out = out;
+               }
+
+               @Override
+               public Object call() throws Exception {
+                       _colEncoder.getMetaData(_out);
+                       return null;
+               }
+       }
 
 }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
 
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
index bad4a5e..fbf7111 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
@@ -208,7 +208,7 @@ public class TransformFrameEncodeMultithreadedTest extends 
AutomatedTestBase {
                        Files.readAllLines(Paths.get(SPEC)).forEach(s -> 
specSb.append(s).append("\n"));
                        MultiColumnEncoder encoder = 
EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
                                input.getNumColumns(), null);
-                       //MultiColumnEncoder.MULTI_THREADED_STAGES = staged;
+                       MultiColumnEncoder.MULTI_THREADED_STAGES = staged;
 
                        MatrixBlock outputS = encoder.encode(input, 1);
                        MatrixBlock outputM = encoder.encode(input, 12);

Reply via email to