This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new db383202d9 [SYSTEMDS-3466] Asynchronous (Future-based) execution of 
Spark instructions
db383202d9 is described below

commit db383202d9fb45ea4ba2fe0877eb1cad30503b12
Author: Arnab Phani <[email protected]>
AuthorDate: Wed Nov 16 18:32:10 2022 +0100

    [SYSTEMDS-3466] Asynchronous (Future-based) execution of Spark instructions
    
    This patch introduces a future-based asynchronous execution of Spark 
actions.
    We wrap the matrix block with a future, create a matrix object handle, and
    maintain that in the symbol table. This extension allows triggering a chain
    of Spark instructions asynchronously and seeking the results only when 
needed.
    
    TODO: Account the memory required for the future results, maintain lineage
    of the broadcast variables to avoid premature removal.
    
    Closes #1733
---
 .../controlprogram/context/ExecutionContext.java   | 11 +++
 .../controlprogram/context/MatrixObjectFuture.java | 87 ++++++++++++++++++++++
 .../spark/AggregateUnarySPInstruction.java         | 76 ++++++++++++++++---
 3 files changed, 164 insertions(+), 10 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
index 7e3076c620..63eb3baa8d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
@@ -67,6 +67,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 public class ExecutionContext {
@@ -601,6 +602,16 @@ public class ExecutionContext {
                mo.release();
        }
 
+       public void setMatrixOutput(String varName, Future<MatrixBlock> fmb) {
+               if (isAutoCreateVars() && !containsVariable(varName)) {
+                       MatrixObject fmo = new 
MatrixObjectFuture(Types.ValueType.FP64,
+                               OptimizerUtils.getUniqueTempFileName(), fmb);
+               }
+               MatrixObject mo = getMatrixObject(varName);
+               MatrixObjectFuture fmo = new MatrixObjectFuture(mo, fmb);
+               setVariable(varName, fmo);
+       }
+
        public void setMatrixOutput(String varName, MatrixBlock outputData, 
UpdateType flag) {
                if( isAutoCreateVars() && !containsVariable(varName) )
                        setVariable(varName, createMatrixObject(outputData));
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/MatrixObjectFuture.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/MatrixObjectFuture.java
new file mode 100644
index 0000000000..3cbc7eff09
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/MatrixObjectFuture.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.context;
+
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
+import java.util.concurrent.Future;
+
+public class MatrixObjectFuture extends MatrixObject
+{
+       protected Future<MatrixBlock> _futureData;
+
+       public MatrixObjectFuture(ValueType vt, String file, 
Future<MatrixBlock> fmb) {
+               super(vt, file, null);
+               _futureData = fmb;
+       }
+
+       public MatrixObjectFuture(MatrixObject mo, Future<MatrixBlock> fmb) {
+               super(mo.getValueType(), mo.getFileName(), mo.getMetaData());
+               _futureData = fmb;
+       }
+
+       MatrixBlock getMatrixBlock() {
+               try {
+                       return _futureData.get();
+               }
+               catch(Exception e) {
+                       throw new DMLRuntimeException(e);
+               }
+       }
+
+       public MatrixBlock acquireRead() {
+               return acquireReadIntern();
+       }
+
+       private synchronized MatrixBlock acquireReadIntern() {
+               try {
+                       if(!isAvailableToRead())
+                               throw new DMLRuntimeException("MatrixObject not 
available to read.");
+                       if(_data != null)
+                               throw new DMLRuntimeException("_data must be 
null for future matrix object/block.");
+                       acquire(false, false);
+                       return _futureData.get();
+               }
+
+               catch(Exception e) {
+                       throw new DMLRuntimeException(e);
+               }
+       }
+
+       public void release() {
+               releaseIntern();
+       }
+
+       private synchronized void releaseIntern() {
+               _futureData = null;
+       }
+
+       public synchronized void clearData(long tid) {
+               _data = null;
+               _futureData = null;
+               clearCache();
+               setCacheLineage(null);
+               setDirty(false);
+               setEmpty();
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java
index 38b032ca67..52bab3958f 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java
@@ -25,6 +25,7 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFunction;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.CorrectionLocationType;
+import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.AggBinaryOp.SparkAggType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
@@ -43,9 +44,15 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.util.CommonThreadPool;
 import scala.Tuple2;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 public class AggregateUnarySPInstruction extends UnarySPInstruction {
        private SparkAggType _aggtype = null;
        private AggregateOperator _aop = null;
@@ -102,19 +109,36 @@ public class AggregateUnarySPInstruction extends 
UnarySPInstruction {
                //perform aggregation if necessary and put output into symbol 
table
                if( _aggtype == SparkAggType.SINGLE_BLOCK )
                {
-                       if( auop.sparseSafe )
-                               out = out.filter(new 
FilterNonEmptyBlocksFunction());
+                       if (ConfigurationManager.isPrefetchEnabled()) {
+                               //Trigger the chain of Spark operations and 
maintain a future to the result
+                               //TODO: Make memory for the future matrix block
+                               try {
+                                       
if(CommonThreadPool.triggerRemoteOPsPool == null)
+                                               
CommonThreadPool.triggerRemoteOPsPool = Executors.newCachedThreadPool();
+                                       RDDAggregateTask task = new 
RDDAggregateTask(_optr, _aop, in, mc);
+                                       Future<MatrixBlock> future_out = 
CommonThreadPool.triggerRemoteOPsPool.submit(task);
+                                       sec.setMatrixOutput(output.getName(), 
future_out);
+                               }
+                               catch(Exception ex) {
+                                       throw new DMLRuntimeException(ex);
+                               }
+                       }
 
-                       JavaRDD<MatrixBlock> out2 = out.map(
-                                       new RDDUAggFunction2(auop, 
mc.getBlocksize()));
-                       MatrixBlock out3 = RDDAggregateUtils.aggStable(out2, 
aggop);
+                       else {
+                               if( auop.sparseSafe )
+                                       out = out.filter(new 
FilterNonEmptyBlocksFunction());
 
-                       //drop correction after aggregation
-                       out3.dropLastRowsOrColumns(aggop.correction);
+                               JavaRDD<MatrixBlock> out2 = out.map(
+                                               new RDDUAggFunction2(auop, 
mc.getBlocksize()));
+                               MatrixBlock out3 = 
RDDAggregateUtils.aggStable(out2, aggop);
 
-                       //put output block into symbol table (no lineage 
because single block)
-                       //this also includes implicit maintenance of matrix 
characteristics
-                       sec.setMatrixOutput(output.getName(), out3);
+                               //drop correction after aggregation
+                               out3.dropLastRowsOrColumns(aggop.correction);
+
+                               //put output block into symbol table (no 
lineage because single block)
+                               //this also includes implicit maintenance of 
matrix characteristics
+                               sec.setMatrixOutput(output.getName(), out3);
+                       }
                }
                else //MULTI_BLOCK or NONE
                {
@@ -337,4 +361,36 @@ public class AggregateUnarySPInstruction extends 
UnarySPInstruction {
                        return out;
                }
        }
+
+       private static class RDDAggregateTask implements Callable<MatrixBlock>
+       {
+               Operator _optr;
+               AggregateOperator _aop;
+               JavaPairRDD<MatrixIndexes, MatrixBlock> _in;
+               DataCharacteristics _mc;
+
+               RDDAggregateTask(Operator optr, AggregateOperator aop, 
JavaPairRDD<MatrixIndexes,
+                       MatrixBlock> input, DataCharacteristics dc) {
+                       _optr = optr;
+                       _aop = aop;
+                       _in = input;
+                       _mc = dc;
+               }
+
+               @Override
+               public MatrixBlock call() {
+                       AggregateUnaryOperator auop = 
(AggregateUnaryOperator)_optr;
+                       JavaPairRDD<MatrixIndexes,MatrixBlock> out = _in;
+                       if( auop.sparseSafe )
+                               out = out.filter(new 
FilterNonEmptyBlocksFunction());
+
+                       JavaRDD<MatrixBlock> out2 = out.map(
+                               new RDDUAggFunction2(auop, _mc.getBlocksize()));
+                       MatrixBlock out3 = RDDAggregateUtils.aggStable(out2, 
_aop);
+
+                       //drop correction after aggregation
+                       out3.dropLastRowsOrColumns(_aop.correction);
+                       return out3;
+               }
+       }
 }

Reply via email to