This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 9316ae2 [SYSTEMDS-3279] Parallel allocation for transformencode
9316ae2 is described below
commit 9316ae2c1c0f238d1c529ef8078020b3c89b44d9
Author: arnabp <[email protected]>
AuthorDate: Wed Jan 26 15:11:21 2022 +0100
[SYSTEMDS-3279] Parallel allocation for transformencode
This patch adds an optimization to make allocation parallel to
the Build phase for dummycoding followed by binning and
dummycoding followed by feature hashing. We can derive the
number of output columns (#bins, K) even before the build
phase completes. Moreover, this patch fixes a bug to allow
dummycoding followed by feature hashing.
---
.../transform/encode/ColumnEncoderComposite.java | 10 ++++-
.../transform/encode/MultiColumnEncoder.java | 48 ++++++++++++++++------
.../apache/sysds/runtime/util/DependencyTask.java | 6 ++-
.../sysds/runtime/util/DependencyThreadPool.java | 2 +-
.../TransformFrameEncodeMultithreadedTest.java | 17 +++++++-
.../datasets/homes3/homes.tfspec_hash_dummy.json | 2 +
6 files changed, 67 insertions(+), 18 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
index a4a9563..b7c162b 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
@@ -161,13 +161,19 @@ public class ColumnEncoderComposite extends ColumnEncoder
{
}
tasks.addAll(t);
}
+
List<List<? extends Callable<?>>> dep = new
ArrayList<>(Collections.nCopies(tasks.size(), null));
DependencyThreadPool.createDependencyList(tasks, depMap, dep);
+ // If DC is required, add an UpdateDC task to update the
domainsize as the last task
+ // Only for RC build, UpdateDC must depends on the Build task,
other can be independent.
if(hasEncoder(ColumnEncoderDummycode.class)) {
tasks.add(DependencyThreadPool.createDependencyTask(new
ColumnCompositeUpdateDCTask(this)));
- dep.add(tasks.subList(tasks.size() - 2, tasks.size() -
1));
+ if (_columnEncoders.get(0) instanceof
ColumnEncoderRecode) {
+ dep.add(tasks.subList(tasks.size() - 2,
tasks.size() - 1));
+ return
DependencyThreadPool.createDependencyTasks(tasks, dep);
+ }
}
- return DependencyThreadPool.createDependencyTasks(tasks, dep);
+ return DependencyThreadPool.createDependencyTasks(tasks, null);
}
@Override
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index 09ecf64..053c452 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -141,7 +141,7 @@ public class MultiColumnEncoder implements Encoder {
* BuildTask: Build an encoder
* ColumnCompositeUpdateDCTask: Update domain size of a DC encoder
based on #distincts, #bins, K
* ColumnMetaDataTask: Fill up metadata of an encoder
- * ApplyTasksWrapperTask: Wrapper task for an Apply
+ * ApplyTasksWrapperTask: Wrapper task for an Apply task
* UpdateOutputColTask: Set starting offsets of the DC columns
*/
private List<DependencyTask<?>> getEncodeTasks(CacheBlock in,
MatrixBlock out, DependencyThreadPool pool) {
@@ -150,6 +150,7 @@ public class MultiColumnEncoder implements Encoder {
Map<Integer[], Integer[]> depMap = new HashMap<>();
boolean hasDC =
getColumnEncoders(ColumnEncoderDummycode.class).size() > 0;
boolean applyOffsetDep = false;
+ boolean independentUpdateDC = false;
_meta = new FrameBlock(in.getNumColumns(), ValueType.STRING);
// Create the output and metadata allocation tasks
tasks.add(DependencyThreadPool.createDependencyTask(new
InitOutputMatrixTask(this, in, out)));
@@ -160,15 +161,36 @@ public class MultiColumnEncoder implements Encoder {
List<DependencyTask<?>> buildTasks =
e.getBuildTasks(in);
tasks.addAll(buildTasks);
if(buildTasks.size() > 0) {
- // Apply Task depends on build completion task
- depMap.put(new Integer[] {tasks.size(),
tasks.size() + 1}, //ApplyTask
- new Integer[] {tasks.size() - 1,
tasks.size()}); //BuildTask
- // getMetaDataTask depends on build completion
- depMap.put(new Integer[] {tasks.size() + 1,
tasks.size() + 2}, //MetaDataTask
- new Integer[] {tasks.size() - 1,
tasks.size()}); //BuildTask
- // AllocMetaTask depends on the build
completion tasks
- depMap.put(new Integer[] {1, 2},
//AllocMetaTask (2nd task)
- new Integer[] {tasks.size() - 1,
tasks.size()}); //BuildTask
+ // Check if any Build independent UpdateDC task
(Bin+DC, FH+DC)
+ if (e.hasEncoder(ColumnEncoderDummycode.class)
+ && buildTasks.size() > 1 //filter out
FH
+ &&
!buildTasks.get(buildTasks.size()-2).hasDependency(buildTasks.get(buildTasks.size()-1)))
+ independentUpdateDC = true;
+
+ // Independent UpdateDC task
+ if (independentUpdateDC) {
+ // Apply Task depends on task prior to
UpdateDC (Build/MergePartialBuild)
+ depMap.put(new Integer[] {tasks.size(),
tasks.size() + 1}, //ApplyTask
+ new Integer[] {tasks.size() -
2, tasks.size() - 1}); //BuildTask
+ // getMetaDataTask depends on task
prior to UpdateDC
+ depMap.put(new Integer[] {tasks.size()
+ 1, tasks.size() + 2}, //MetaDataTask
+ new Integer[] {tasks.size() -
2, tasks.size() - 1}); //BuildTask
+ }
+ else {
+ // Apply Task depends on the last task
(Build/MergePartial/UpdateDC)
+ depMap.put(new Integer[] {tasks.size(),
tasks.size() + 1}, //ApplyTask
+ new Integer[] {tasks.size() -
1, tasks.size()}); //Build/UpdateDC
+ // getMetaDataTask depends on build
completion
+ depMap.put(new Integer[] {tasks.size()
+ 1, tasks.size() + 2}, //MetaDataTask
+ new Integer[] {tasks.size() -
1, tasks.size()}); //Build/UpdateDC
+ }
+ // AllocMetaTask never depends on the UpdateDC
task
+ if (e.hasEncoder(ColumnEncoderDummycode.class)
&& buildTasks.size() > 1)
+ depMap.put(new Integer[] {1, 2},
//AllocMetaTask (2nd task)
+ new Integer[] {tasks.size() -
2, tasks.size()-1}); //BuildTask
+ else
+ depMap.put(new Integer[] {1, 2},
//AllocMetaTask (2nd task)
+ new Integer[] {tasks.size() -
1, tasks.size()}); //BuildTask
}
// getMetaDataTask depends on AllocMeta task
@@ -356,8 +378,8 @@ public class MultiColumnEncoder implements Encoder {
if (MatrixBlock.DEFAULT_SPARSEBLOCK !=
SparseBlock.Type.CSR
&& MatrixBlock.DEFAULT_SPARSEBLOCK !=
SparseBlock.Type.MCSR)
throw new RuntimeException("Transformapply is
only supported for MCSR and CSR output matrix");
- boolean mcsr = MatrixBlock.DEFAULT_SPARSEBLOCK ==
SparseBlock.Type.MCSR;
- mcsr = false; //force CSR for transformencode
+ //boolean mcsr = MatrixBlock.DEFAULT_SPARSEBLOCK ==
SparseBlock.Type.MCSR;
+ boolean mcsr = false; //force CSR for transformencode
if (mcsr) {
output.allocateBlock();
SparseBlock block = output.getSparseBlock();
@@ -461,7 +483,7 @@ public class MultiColumnEncoder implements Encoder {
columnEncoder.getMetaData(meta);
}
- //_columnEncoders.stream().parallel().forEach(columnEncoder ->
+ //_columnEncoders.stream().parallel().forEach(columnEncoder ->
// columnEncoder.getMetaData(meta));
if(_legacyOmit != null)
_legacyOmit.getMetaData(meta);
diff --git a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
index 17351a6..69c25fe 100644
--- a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
+++ b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
@@ -68,11 +68,15 @@ public class DependencyTask<E> implements
Comparable<DependencyTask<?>>, Callabl
return isReady();
}
}
-
+
public void addDependent(DependencyTask<?> dependencyTask) {
_dependantTasks.add(dependencyTask);
dependencyTask._rdy += 1;
}
+
+ public boolean hasDependency (DependencyTask<?> dependencyTask) {
+ return _dependantTasks.contains(dependencyTask);
+ }
@Override
public E call() throws Exception {
diff --git
a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
index 90d1dfc..1e8f99b 100644
--- a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
+++ b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
@@ -148,7 +148,7 @@ public class DependencyThreadPool {
List<List<? extends Callable<?>>> dependencies) {
if(dependencies != null && tasks.size() != dependencies.size())
throw new DMLRuntimeException(
- "Could not create DependencyTasks since the
input array sizes are where mismatched");
+ "Could not create DependencyTasks since the
input array sizes are mismatching");
List<DependencyTask<?>> ret = new ArrayList<>();
Map<Callable<?>, DependencyTask<?>> map = new HashMap<>();
for(Callable<?> task : tasks) {
diff --git
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
index 560f934..ff24a99 100644
---
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
@@ -56,13 +56,14 @@ public class TransformFrameEncodeMultithreadedTest extends
AutomatedTestBase {
private final static String SPEC7 =
"homes3/homes.tfspec_binDummy.json"; // recode+dummy
private final static String SPEC8 = "homes3/homes.tfspec_hash.json";
private final static String SPEC9 =
"homes3/homes.tfspec_hash_recode.json";
+ private final static String SPEC10 =
"homes3/homes.tfspec_hash_dummy.json";
private static final int[] BIN_col3 = new int[] {1, 4, 2, 3, 3, 2, 4};
private static final int[] BIN_col8 = new int[] {1, 2, 2, 2, 2, 2, 3};
public enum TransformType {
RECODE, DUMMY, DUMMY_ALL, // to test sparse
- RECODE_DUMMY, BIN, BIN_DUMMY, HASH, HASH_RECODE,
+ RECODE_DUMMY, BIN, BIN_DUMMY, HASH, HASH_RECODE, HASH_DUMMY
}
@Override
@@ -112,6 +113,11 @@ public class TransformFrameEncodeMultithreadedTest extends
AutomatedTestBase {
}
@Test
+ public void testHomesHashDummyCodeNonStaged() {
+ runTransformTest(ExecMode.SINGLE_NODE, "csv",
TransformType.HASH_DUMMY, false);
+ }
+
+ @Test
public void testHomesRecodeStaged() {
runTransformTest(ExecMode.SINGLE_NODE, "csv",
TransformType.RECODE, true);
}
@@ -151,6 +157,11 @@ public class TransformFrameEncodeMultithreadedTest extends
AutomatedTestBase {
runTransformTest(ExecMode.SINGLE_NODE, "csv",
TransformType.HASH_RECODE, true);
}
+ @Test
+ public void testHomesHashDummyCodeStaged() {
+ runTransformTest(ExecMode.SINGLE_NODE, "csv",
TransformType.HASH_DUMMY, true);
+ }
+
private void runTransformTest(ExecMode rt, String ofmt, TransformType
type, boolean staged) {
// set transform specification
@@ -189,6 +200,10 @@ public class TransformFrameEncodeMultithreadedTest extends
AutomatedTestBase {
SPEC = SPEC9;
DATASET = DATASET1;
break;
+ case HASH_DUMMY:
+ SPEC = SPEC10;
+ DATASET = DATASET1;
+ break;
}
if(!ofmt.equals("csv"))
diff --git a/src/test/resources/datasets/homes3/homes.tfspec_hash_dummy.json
b/src/test/resources/datasets/homes3/homes.tfspec_hash_dummy.json
new file mode 100644
index 0000000..631b3be
--- /dev/null
+++ b/src/test/resources/datasets/homes3/homes.tfspec_hash_dummy.json
@@ -0,0 +1,2 @@
+{
+ "ids": true, "hash": [ 1, 2, 7 ], "K": 100, "dummycode": [ 1, 2, 3, 6 ] }