This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 90ab153  [SYSTEMDS-2641] Fix spark remove empty instruction 
(broadcasts handling)
90ab153 is described below

commit 90ab1535d19569c7f2037392278021c014cb4e94
Author: Matthias Boehm <[email protected]>
AuthorDate: Wed Feb 3 19:01:25 2021 +0100

    [SYSTEMDS-2641] Fix spark remove empty instruction (broadcasts handling)
    
    This patch fixes a major issue with the spark remove empty handling of
    broadcast flags. Since this flag is serialized as a normal named
    parameter, the parsing logic was wrong and always returned false,
    falling back to vector replication for a shuffle-based join of the
    selection vector.
    
    Furthermore, the patch also contains a minor improvement of the slice
    finding algorithm to track the elapsed time per lattice level.
---
 scripts/builtin/slicefinder.dml                    |  5 ++++-
 .../spark/ParameterizedBuiltinSPInstruction.java   | 26 +++++++++-------------
 2 files changed, 15 insertions(+), 16 deletions(-)

diff --git a/scripts/builtin/slicefinder.dml b/scripts/builtin/slicefinder.dml
index a6b8bfd..a9a1dff 100644
--- a/scripts/builtin/slicefinder.dml
+++ b/scripts/builtin/slicefinder.dml
@@ -43,6 +43,8 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double] e, 
Int k = 4,
     Int tpBlksz = 16, Boolean selFeat = FALSE, Boolean verbose = FALSE)
   return(Matrix[Double] TK, Matrix[Double] TKC, Matrix[Double] D)
 {
+       t1 = time();
+       
   # init debug matrix: levelID, enumerated S, valid S, TKmax, TKmin
   D = matrix(0, 0, 5); 
   
@@ -67,7 +69,7 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double] e, 
Int k = 4,
 
   if( verbose ) {
     [maxsc, minsc] = analyzeTopK(TKC);
-    print("SliceFinder: initial top-K: count="+nrow(TK)+", max="+maxsc+", 
min="+minsc)
+    print("SliceFinder: initial top-K: count="+nrow(TK)+", max="+maxsc+", 
min="+minsc+" (time="+(time()-t1)+")")
     D = rbind(D, t(as.matrix(list(1, n2, nrow(S), maxsc, minsc))));
   }
 
@@ -116,6 +118,7 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double] 
e, Int k = 4,
       valid = as.integer(sum(R[,2]>0 & R[,4]>=minSup));
       print(" -- valid slices after eval: "+valid+"/"+nrow(S));
       print(" -- top-K: count="+nrow(TK)+", max="+maxsc+", min="+minsc);
+      print(" -- (time="+(time()-t1)+")")
       D = rbind(D, t(as.matrix(list(level, nrow(S), valid, maxsc, minsc))));
     }
   }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index cee035b..c6f065c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -84,14 +84,10 @@ import java.util.Iterator;
 
 public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction {
        protected HashMap<String, String> params;
-       // removeEmpty-specific attributes
-       private boolean _bRmEmptyBC = false;
 
-       ParameterizedBuiltinSPInstruction(Operator op, HashMap<String, String> 
paramsMap, CPOperand out, String opcode,
-                       String istr, boolean bRmEmptyBC) {
+       ParameterizedBuiltinSPInstruction(Operator op, HashMap<String, String> 
paramsMap, CPOperand out, String opcode, String istr) {
                super(SPType.ParameterizedBuiltin, op, null, null, out, opcode, 
istr);
                params = paramsMap;
-               _bRmEmptyBC = bRmEmptyBC;
        }
 
        public HashMap<String,String> getParams() { return params; }
@@ -127,8 +123,7 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                        paramsMap.put(Statement.GAGG_NUM_GROUPS, parts[4]);
                        
                        Operator op = new AggregateOperator(0, 
KahanPlus.getKahanPlusFnObject(), CorrectionLocationType.LASTCOLUMN);
-                       
-                       return new ParameterizedBuiltinSPInstruction(op, 
paramsMap, out, opcode, str, false);
+                       return new ParameterizedBuiltinSPInstruction(op, 
paramsMap, out, opcode, str);
                }
                else
                {
@@ -150,12 +145,12 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                                                throw new 
DMLRuntimeException("Mandatory \"order\" must be specified when 
fn=\"centralmoment\" in groupedAggregate.");
                                }
                                Operator op = 
InstructionUtils.parseGroupedAggOperator(fnStr, paramsMap.get("order"));
-                               return new 
ParameterizedBuiltinSPInstruction(op, paramsMap, out, opcode, str, false);
+                               return new 
ParameterizedBuiltinSPInstruction(op, paramsMap, out, opcode, str);
                        } 
                        else if (opcode.equalsIgnoreCase("rmempty")) {
                                func = 
ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
-                               return new 
ParameterizedBuiltinSPInstruction(new SimpleOperator(func),
-                                       paramsMap, out, opcode, str, 
Boolean.parseBoolean(parts[parts.length-2]));
+                               return new ParameterizedBuiltinSPInstruction(
+                                       new SimpleOperator(func), paramsMap, 
out, opcode, str);
                        }
                        else if (opcode.equalsIgnoreCase("rexpand")
                                || opcode.equalsIgnoreCase("replace")
@@ -164,7 +159,7 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                                || opcode.equalsIgnoreCase("transformapply")
                                || opcode.equalsIgnoreCase("transformdecode")) {
                                func = 
ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
-                               return new 
ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, 
opcode, str, false);
+                               return new 
ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, 
opcode, str);
                        }
                        else {
                                throw new DMLRuntimeException("Unknown opcode 
(" + opcode + ") for ParameterizedBuiltin Instruction.");
@@ -311,6 +306,7 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                        boolean rows = sec.getScalarInput(params.get("margin"), 
ValueType.STRING, true).getStringValue().equals("rows");
                        boolean emptyReturn = 
Boolean.parseBoolean(params.get("empty.return").toLowerCase());
                        long maxDim = sec.getScalarInput(params.get("maxdim"), 
ValueType.FP64, false).getLongValue();
+                       boolean bRmEmptyBC = 
Boolean.parseBoolean(params.get("bRmEmptyBC"));
                        DataCharacteristics mcIn = 
sec.getDataCharacteristics(rddInVar);
                        
                        if( maxDim > 0 ) //default case
@@ -325,7 +321,7 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                                //execute remove empty rows/cols operation
                                JavaPairRDD<MatrixIndexes,MatrixBlock> out;
        
-                               if(_bRmEmptyBC){
+                               if(bRmEmptyBC){
                                        broadcastOff = 
sec.getBroadcastForVariable( rddOffVar );
                                        // Broadcast offset vector
                                        out = in
@@ -343,10 +339,10 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                                //store output rdd handle
                                sec.setRDDHandleForVariable(output.getName(), 
out);
                                sec.addLineageRDD(output.getName(), rddInVar);
-                               if(!_bRmEmptyBC)
-                                       sec.addLineageRDD(output.getName(), 
rddOffVar);
-                               else
+                               if(bRmEmptyBC)
                                        
sec.addLineageBroadcast(output.getName(), rddOffVar);
+                               else
+                                       sec.addLineageRDD(output.getName(), 
rddOffVar);
                                
                                //update output statistics (required for 
correctness)
                                DataCharacteristics mcOut = 
sec.getDataCharacteristics(output.getName());

Reply via email to