This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new c86b5d5  [MINOR] Improved spark exec type selection for unary 
aggregates
c86b5d5 is described below

commit c86b5d55896df7e429878f9debac6f95ef080284
Author: Matthias Boehm <[email protected]>
AuthorDate: Mon Jul 26 13:15:37 2021 +0200

    [MINOR] Improved spark exec type selection for unary aggregates
    
    This patch refines the execution type selection logic of unary
    aggregates. By default, the memory estimates and memory budget drive the
    decision of CP (in-memory) vs Spark (distributed, out-of-core). However,
    a unary aggregate is largely reducing the data size - for that reason,
    we pull unary aggregates whose inputs are created by spark instructions
    in spark execution type as well, significantly reducing collected data
    (i.e., transfered from spark executors to the spark driver).
    
    For robustness we only did that if the unary aggregate is the only
    parent, to avoid unnecessary redundant spark execution if we would then
    anyway pull the intermediate into local memory. Instead of just looking
    at the number of parents of the input, we now do a more informed
    analysis where these parents are execution - if all others are in spark,
    it's preferrable to pull this one single unary aggregate also into sark
    execution mode.
---
 src/main/java/org/apache/sysds/hops/AggUnaryOp.java | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/sysds/hops/AggUnaryOp.java 
b/src/main/java/org/apache/sysds/hops/AggUnaryOp.java
index 9c18f49..424f730 100644
--- a/src/main/java/org/apache/sysds/hops/AggUnaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/AggUnaryOp.java
@@ -372,10 +372,13 @@ public class AggUnaryOp extends MultiThreadedHop
 
                //spark-specific decision refinement (execute unary aggregate 
w/ spark input and 
                //single parent also in spark because it's likely cheap and 
reduces data transfer)
+               //we also allow multiple parents, if all other parents are 
already in Spark mode
                if( _etype == ExecType.CP && _etypeForced != ExecType.CP
                        && !(getInput().get(0) instanceof DataOp)  //input is 
not checkpoint
                        && (getInput().get(0).getParent().size()==1 //uagg is 
only parent, or 
-                          || !requiresAggregation(getInput().get(0), 
_direction)) //w/o agg
+                               || 
getInput().get(0).getParent().stream().filter(h -> h != this)
+                                       .allMatch(h -> h.optFindExecType() == 
ExecType.SPARK)
+                               || !requiresAggregation(getInput().get(0), 
_direction)) //w/o agg
                        && getInput().get(0).optFindExecType() == 
ExecType.SPARK )
                {
                        //pull unary aggregate into spark 

Reply via email to