This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new dd11429 [SYSTEMDS-3222] Multi-threaded metadata collection for
transformencode
dd11429 is described below
commit dd11429686281defc2987433085b96f3d89abad8
Author: arnabp <[email protected]>
AuthorDate: Mon Nov 15 22:20:01 2021 +0100
[SYSTEMDS-3222] Multi-threaded metadata collection for transformencode
This patch adds the initial multi-threaded getMetaData(). getMetaData
is not part of the transformencode task-graph for now.
Additionally, this patch fixes some typos.
---
.../runtime/compress/CompressedMatrixBlock.java | 4 +-
...ltiReturnParameterizedBuiltinCPInstruction.java | 3 +-
.../sysds/runtime/matrix/data/MatrixBlock.java | 10 ++--
.../runtime/transform/encode/ColumnEncoderBin.java | 5 ++
.../transform/encode/ColumnEncoderComposite.java | 8 +++
.../transform/encode/ColumnEncoderDummycode.java | 5 ++
.../transform/encode/ColumnEncoderFeatureHash.java | 6 ++
.../transform/encode/ColumnEncoderPassThrough.java | 6 ++
.../transform/encode/ColumnEncoderRecode.java | 20 +++++++
.../sysds/runtime/transform/encode/Encoder.java | 6 ++
.../transform/encode/MultiColumnEncoder.java | 64 +++++++++++++++++++++-
.../TransformFrameEncodeMultithreadedTest.java | 2 +-
12 files changed, 128 insertions(+), 11 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index 162d919..654be34 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -1076,9 +1076,9 @@ public class CompressedMatrixBlock extends MatrixBlock {
}
if(m2 instanceof CompressedMatrixBlock)
- m2 = ((CompressedMatrixBlock)
m2).getUncompressed("Ternay Operator arg2 " + op.fn.getClass().getSimpleName());
+ m2 = ((CompressedMatrixBlock)
m2).getUncompressed("Ternary Operator arg2 " +
op.fn.getClass().getSimpleName());
if(m3 instanceof CompressedMatrixBlock)
- m3 = ((CompressedMatrixBlock)
m3).getUncompressed("Ternay Operator arg3 " + op.fn.getClass().getSimpleName());
+ m3 = ((CompressedMatrixBlock)
m3).getUncompressed("Ternary Operator arg3 " +
op.fn.getClass().getSimpleName());
if(s2 != s3 && (op.fn instanceof PlusMultiply || op.fn
instanceof MinusMultiply)) {
// SPECIAL CASE for sparse-dense combinations of common
+* and -*
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
index 14dbc3a..eda8385 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
@@ -88,7 +88,8 @@ public class MultiReturnParameterizedBuiltinCPInstruction
extends ComputationCPI
MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec,
colnames, fin.getNumColumns(), null);
// TODO: Assign #threads in compiler and pass via the
instruction string
MatrixBlock data = encoder.encode(fin,
OptimizerUtils.getTransformNumThreads()); // build and apply
- FrameBlock meta = encoder.getMetaData(new
FrameBlock(fin.getNumColumns(), ValueType.STRING));
+ FrameBlock meta = encoder.getMetaData(new
FrameBlock(fin.getNumColumns(), ValueType.STRING),
+ OptimizerUtils.getTransformNumThreads());
meta.setColumnNames(colnames);
// release input and outputs
diff --git
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 2f521b5..d77e7af 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -3032,10 +3032,10 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock, Externalizab
if(m2 instanceof CompressedMatrixBlock)
m2 = ((CompressedMatrixBlock) m2)
- .getUncompressed("Ternay Operator arg2
" + op.fn.getClass().getSimpleName());
+ .getUncompressed("Ternary Operator arg2
" + op.fn.getClass().getSimpleName());
if(m3 instanceof CompressedMatrixBlock)
m3 = ((CompressedMatrixBlock) m3)
- .getUncompressed("Ternay Operator arg3
" + op.fn.getClass().getSimpleName());
+ .getUncompressed("Ternary Operator arg3
" + op.fn.getClass().getSimpleName());
ret.reset(m, n, sparseOutput);
@@ -5102,11 +5102,11 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock, Externalizab
public MatrixBlock aggregateTernaryOperations(MatrixBlock m1,
MatrixBlock m2, MatrixBlock m3, MatrixBlock ret,
AggregateTernaryOperator op, boolean inCP) {
if(m1 instanceof CompressedMatrixBlock)
- m1 = ((CompressedMatrixBlock)
m1).getUncompressed("Aggregate Ternay Operator arg1 " +
op.getClass().getSimpleName());
+ m1 = ((CompressedMatrixBlock)
m1).getUncompressed("Aggregate Ternary Operator arg1 " +
op.getClass().getSimpleName());
if(m2 instanceof CompressedMatrixBlock)
- m2 = ((CompressedMatrixBlock)
m2).getUncompressed("Aggregate Ternay Operator arg2 " +
op.getClass().getSimpleName());
+ m2 = ((CompressedMatrixBlock)
m2).getUncompressed("Aggregate Ternary Operator arg2 " +
op.getClass().getSimpleName());
if(m3 instanceof CompressedMatrixBlock)
- m3 = ((CompressedMatrixBlock)
m3).getUncompressed("Aggregate Ternay Operator arg3 " +
op.getClass().getSimpleName());
+ m3 = ((CompressedMatrixBlock)
m3).getUncompressed("Aggregate Ternary Operator arg3 " +
op.getClass().getSimpleName());
//create output matrix block w/ corrections
int rl = (op.indexFn instanceof ReduceRow) ? 2 : 1;
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
index a201448..ab9f662 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
@@ -205,6 +205,11 @@ public class ColumnEncoderBin extends ColumnEncoder {
}
@Override
+ public void allocateMetaData(FrameBlock meta) {
+ meta.ensureAllocatedColumns(_binMaxs.length);
+ }
+
+ @Override
public FrameBlock getMetaData(FrameBlock meta) {
// allocate frame if necessary
meta.ensureAllocatedColumns(_binMaxs.length);
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
index 4384bae..1f5fce6 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
@@ -269,6 +269,14 @@ public class ColumnEncoderComposite extends ColumnEncoder {
}
@Override
+ public void allocateMetaData(FrameBlock meta) {
+ if(_meta != null)
+ return;
+ for(ColumnEncoder columnEncoder : _columnEncoders)
+ columnEncoder.allocateMetaData(meta);
+ }
+
+ @Override
public FrameBlock getMetaData(FrameBlock out) {
if(_meta != null)
return _meta;
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
index d8fcd47..1ea4933 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
@@ -189,6 +189,11 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
}
@Override
+ public void allocateMetaData(FrameBlock meta) {
+ return;
+ }
+
+ @Override
public FrameBlock getMetaData(FrameBlock meta) {
return meta;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
index 013deac..922b5a8 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
@@ -99,6 +99,12 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder {
}
@Override
+ public void allocateMetaData(FrameBlock meta) {
+ if (isApplicable())
+ meta.ensureAllocatedColumns(1);
+ }
+
+ @Override
public FrameBlock getMetaData(FrameBlock meta) {
if(!isApplicable())
return meta;
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
index f49747e..bc20fe3 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
@@ -100,6 +100,12 @@ public class ColumnEncoderPassThrough extends
ColumnEncoder {
}
@Override
+ public void allocateMetaData(FrameBlock meta) {
+ // do nothing
+ return;
+ }
+
+ @Override
public FrameBlock getMetaData(FrameBlock meta) {
// do nothing
return meta;
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
index 2371789..8246f97 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
@@ -178,6 +178,20 @@ public class ColumnEncoderRecode extends ColumnEncoder {
long code = lookupRCDMap(key);
return (code < 0) ? Double.NaN : code;
}
+
+ protected double[] getCodeCol(CacheBlock in) {
+ Object[] coldata = (Object[])
((FrameBlock)in).getColumnData(_colID-1);
+ double codes[] = new double[in.getNumRows()];
+ for (int i=0; i<coldata.length; i++) {
+ Object okey = coldata[i];
+ String key = (okey != null) ? okey.toString() : null;
+ if(key == null || key.isEmpty())
+ codes[i] = Double.NaN;
+ long code = lookupRCDMap(key);
+ codes[i] = code;
+ }
+ return codes;
+ }
@Override
public void prepareBuildPartial() {
@@ -231,6 +245,12 @@ public class ColumnEncoderRecode extends ColumnEncoder {
public int getNumDistinctValues() {
return _rcdMap.size();
}
+
+ @Override
+ public void allocateMetaData(FrameBlock meta) {
+ // allocate output rows
+ meta.ensureAllocatedColumns(getNumDistinctValues());
+ }
@Override
public FrameBlock getMetaData(FrameBlock meta) {
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/Encoder.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/Encoder.java
index 3446fb1..075919e 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/Encoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/Encoder.java
@@ -47,6 +47,12 @@ public interface Encoder extends Externalizable {
* @return output matrix block
*/
MatrixBlock apply(CacheBlock in, MatrixBlock out, int outputCol);
+
+ /**
+ * Pre-allocate a FrameBlock for metadata collection.
+ * @param meta frame block
+ */
+ void allocateMetaData(FrameBlock meta);
/**
* Construct a frame block out of the transform meta data.
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index 4f5c2af..f593487 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -32,6 +32,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -48,6 +50,7 @@ import org.apache.sysds.runtime.data.SparseBlockMCSR;
import org.apache.sysds.runtime.data.SparseRowVector;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.DependencyTask;
import org.apache.sysds.runtime.util.DependencyThreadPool;
import org.apache.sysds.runtime.util.DependencyWrapperTask;
@@ -103,7 +106,10 @@ public class MultiColumnEncoder implements Encoder {
}
else {
LOG.debug("Encoding with staged approach on: "
+ k + " Threads");
+ long t0 = System.nanoTime();
build(in, k);
+ long t1 = System.nanoTime();
+ LOG.debug("Elapsed time for build phase: "+
((double) t1 - t0) / 1000000 + " ms");
if(_legacyMVImpute != null) {
// These operations are redundant for
every encoder excluding the legacyMVImpute, the workaround to
// fix it for this encoder would be
very dirty. This will only have a performance impact if there
@@ -113,7 +119,10 @@ public class MultiColumnEncoder implements Encoder {
initMetaData(_meta);
}
// apply meta data
+ t0 = System.nanoTime();
out = apply(in, k);
+ t1 = System.nanoTime();
+ LOG.debug("Elapsed time for apply phase: "+
((double) t1 - t0) / 1000000 + " ms");
}
}
catch(Exception ex) {
@@ -351,15 +360,50 @@ public class MultiColumnEncoder implements Encoder {
}
@Override
+ public void allocateMetaData(FrameBlock meta) {
+ for(ColumnEncoder columnEncoder : _columnEncoders) {
+ columnEncoder.allocateMetaData(meta);
+ }
+ }
+
+ @Override
public FrameBlock getMetaData(FrameBlock meta) {
+ getMetaData(meta, 1);
+ return meta;
+ }
+
+ public FrameBlock getMetaData(FrameBlock meta, int k) {
+ long t0 = System.nanoTime();
if(_meta != null)
return _meta;
- for(ColumnEncoder columnEncoder : _columnEncoders)
- columnEncoder.getMetaData(meta);
+ this.allocateMetaData(meta);
+ if (k > 1) {
+ try {
+ ExecutorService pool = CommonThreadPool.get(k);
+ ArrayList<ColumnMetaDataTask<? extends
ColumnEncoder>> tasks = new ArrayList<>();
+ for(ColumnEncoder columnEncoder :
_columnEncoders)
+ tasks.add(new
ColumnMetaDataTask<>(columnEncoder, meta));
+ List<Future<Object>> taskret =
pool.invokeAll(tasks);
+ pool.shutdown();
+ for (Future<Object> task : taskret)
+ task.get();
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ }
+ else {
+ for(ColumnEncoder columnEncoder : _columnEncoders)
+ columnEncoder.getMetaData(meta);
+ }
+
+ //_columnEncoders.stream().parallel().forEach(columnEncoder ->
+ // columnEncoder.getMetaData(meta));
if(_legacyOmit != null)
_legacyOmit.getMetaData(meta);
if(_legacyMVImpute != null)
_legacyMVImpute.getMetaData(meta);
+ LOG.debug("Time spent getting metadata "+((double)
System.nanoTime() - t0) / 1000000 + " ms");
return meta;
}
@@ -852,5 +896,21 @@ public class MultiColumnEncoder implements Encoder {
return null;
}
}
+
+ private static class ColumnMetaDataTask<T extends ColumnEncoder>
implements Callable<Object> {
+ private final T _colEncoder;
+ private final FrameBlock _out;
+
+ protected ColumnMetaDataTask(T encoder, FrameBlock out) {
+ _colEncoder = encoder;
+ _out = out;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ _colEncoder.getMetaData(_out);
+ return null;
+ }
+ }
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
index bad4a5e..fbf7111 100644
---
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
@@ -208,7 +208,7 @@ public class TransformFrameEncodeMultithreadedTest extends
AutomatedTestBase {
Files.readAllLines(Paths.get(SPEC)).forEach(s ->
specSb.append(s).append("\n"));
MultiColumnEncoder encoder =
EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
input.getNumColumns(), null);
- //MultiColumnEncoder.MULTI_THREADED_STAGES = staged;
+ MultiColumnEncoder.MULTI_THREADED_STAGES = staged;
MatrixBlock outputS = encoder.encode(input, 1);
MatrixBlock outputM = encoder.encode(input, 12);