This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 90ab153 [SYSTEMDS-2641] Fix spark remove empty instruction
(broadcasts handling)
90ab153 is described below
commit 90ab1535d19569c7f2037392278021c014cb4e94
Author: Matthias Boehm <[email protected]>
AuthorDate: Wed Feb 3 19:01:25 2021 +0100
[SYSTEMDS-2641] Fix spark remove empty instruction (broadcasts handling)
This patch fixes a major issue with the spark remove empty handling of
broadcast flags. Since this flag is serialized as a normal named
parameter, the parsing logic was wrong and always returned false,
falling back to vector replication for a shuffle-based join of the
selection vector.
Furthermore, the patch also contains a minor improvement of the slice
finding algorithm to track the elapsed time per lattice level.
---
scripts/builtin/slicefinder.dml | 5 ++++-
.../spark/ParameterizedBuiltinSPInstruction.java | 26 +++++++++-------------
2 files changed, 15 insertions(+), 16 deletions(-)
diff --git a/scripts/builtin/slicefinder.dml b/scripts/builtin/slicefinder.dml
index a6b8bfd..a9a1dff 100644
--- a/scripts/builtin/slicefinder.dml
+++ b/scripts/builtin/slicefinder.dml
@@ -43,6 +43,8 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double] e,
Int k = 4,
Int tpBlksz = 16, Boolean selFeat = FALSE, Boolean verbose = FALSE)
return(Matrix[Double] TK, Matrix[Double] TKC, Matrix[Double] D)
{
+ t1 = time();
+
# init debug matrix: levelID, enumerated S, valid S, TKmax, TKmin
D = matrix(0, 0, 5);
@@ -67,7 +69,7 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double] e,
Int k = 4,
if( verbose ) {
[maxsc, minsc] = analyzeTopK(TKC);
- print("SliceFinder: initial top-K: count="+nrow(TK)+", max="+maxsc+",
min="+minsc)
+ print("SliceFinder: initial top-K: count="+nrow(TK)+", max="+maxsc+",
min="+minsc+" (time="+(time()-t1)+")")
D = rbind(D, t(as.matrix(list(1, n2, nrow(S), maxsc, minsc))));
}
@@ -116,6 +118,7 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double]
e, Int k = 4,
valid = as.integer(sum(R[,2]>0 & R[,4]>=minSup));
print(" -- valid slices after eval: "+valid+"/"+nrow(S));
print(" -- top-K: count="+nrow(TK)+", max="+maxsc+", min="+minsc);
+ print(" -- (time="+(time()-t1)+")")
D = rbind(D, t(as.matrix(list(level, nrow(S), valid, maxsc, minsc))));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index cee035b..c6f065c 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -84,14 +84,10 @@ import java.util.Iterator;
public class ParameterizedBuiltinSPInstruction extends
ComputationSPInstruction {
protected HashMap<String, String> params;
- // removeEmpty-specific attributes
- private boolean _bRmEmptyBC = false;
- ParameterizedBuiltinSPInstruction(Operator op, HashMap<String, String>
paramsMap, CPOperand out, String opcode,
- String istr, boolean bRmEmptyBC) {
+ ParameterizedBuiltinSPInstruction(Operator op, HashMap<String, String>
paramsMap, CPOperand out, String opcode, String istr) {
super(SPType.ParameterizedBuiltin, op, null, null, out, opcode,
istr);
params = paramsMap;
- _bRmEmptyBC = bRmEmptyBC;
}
public HashMap<String,String> getParams() { return params; }
@@ -127,8 +123,7 @@ public class ParameterizedBuiltinSPInstruction extends
ComputationSPInstruction
paramsMap.put(Statement.GAGG_NUM_GROUPS, parts[4]);
Operator op = new AggregateOperator(0,
KahanPlus.getKahanPlusFnObject(), CorrectionLocationType.LASTCOLUMN);
-
- return new ParameterizedBuiltinSPInstruction(op,
paramsMap, out, opcode, str, false);
+ return new ParameterizedBuiltinSPInstruction(op,
paramsMap, out, opcode, str);
}
else
{
@@ -150,12 +145,12 @@ public class ParameterizedBuiltinSPInstruction extends
ComputationSPInstruction
throw new
DMLRuntimeException("Mandatory \"order\" must be specified when
fn=\"centralmoment\" in groupedAggregate.");
}
Operator op =
InstructionUtils.parseGroupedAggOperator(fnStr, paramsMap.get("order"));
- return new
ParameterizedBuiltinSPInstruction(op, paramsMap, out, opcode, str, false);
+ return new
ParameterizedBuiltinSPInstruction(op, paramsMap, out, opcode, str);
}
else if (opcode.equalsIgnoreCase("rmempty")) {
func =
ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
- return new
ParameterizedBuiltinSPInstruction(new SimpleOperator(func),
- paramsMap, out, opcode, str,
Boolean.parseBoolean(parts[parts.length-2]));
+ return new ParameterizedBuiltinSPInstruction(
+ new SimpleOperator(func), paramsMap,
out, opcode, str);
}
else if (opcode.equalsIgnoreCase("rexpand")
|| opcode.equalsIgnoreCase("replace")
@@ -164,7 +159,7 @@ public class ParameterizedBuiltinSPInstruction extends
ComputationSPInstruction
|| opcode.equalsIgnoreCase("transformapply")
|| opcode.equalsIgnoreCase("transformdecode")) {
func =
ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
- return new
ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out,
opcode, str, false);
+ return new
ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out,
opcode, str);
}
else {
throw new DMLRuntimeException("Unknown opcode
(" + opcode + ") for ParameterizedBuiltin Instruction.");
@@ -311,6 +306,7 @@ public class ParameterizedBuiltinSPInstruction extends
ComputationSPInstruction
boolean rows = sec.getScalarInput(params.get("margin"),
ValueType.STRING, true).getStringValue().equals("rows");
boolean emptyReturn =
Boolean.parseBoolean(params.get("empty.return").toLowerCase());
long maxDim = sec.getScalarInput(params.get("maxdim"),
ValueType.FP64, false).getLongValue();
+ boolean bRmEmptyBC =
Boolean.parseBoolean(params.get("bRmEmptyBC"));
DataCharacteristics mcIn =
sec.getDataCharacteristics(rddInVar);
if( maxDim > 0 ) //default case
@@ -325,7 +321,7 @@ public class ParameterizedBuiltinSPInstruction extends
ComputationSPInstruction
//execute remove empty rows/cols operation
JavaPairRDD<MatrixIndexes,MatrixBlock> out;
- if(_bRmEmptyBC){
+ if(bRmEmptyBC){
broadcastOff =
sec.getBroadcastForVariable( rddOffVar );
// Broadcast offset vector
out = in
@@ -343,10 +339,10 @@ public class ParameterizedBuiltinSPInstruction extends
ComputationSPInstruction
//store output rdd handle
sec.setRDDHandleForVariable(output.getName(),
out);
sec.addLineageRDD(output.getName(), rddInVar);
- if(!_bRmEmptyBC)
- sec.addLineageRDD(output.getName(),
rddOffVar);
- else
+ if(bRmEmptyBC)
sec.addLineageBroadcast(output.getName(), rddOffVar);
+ else
+ sec.addLineageRDD(output.getName(),
rddOffVar);
//update output statistics (required for
correctness)
DataCharacteristics mcOut =
sec.getDataCharacteristics(output.getName());