Repository: incubator-systemml
Updated Branches:
  refs/heads/master 4c162afd9 -> a429e2df9


[SYSTEMML-1433] Fix unnecessarily blocking rdd and broadcast cleanup

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f380b52a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f380b52a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f380b52a

Branch: refs/heads/master
Commit: f380b52a9d041798b268de8007140b418af1e0db
Parents: 4c162af
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Wed Mar 22 21:27:00 2017 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Thu Mar 23 13:39:37 2017 -0700

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java            | 18 +++++++++---------
 .../spark/data/PartitionedBroadcast.java          |  9 +++++++++
 2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f380b52a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 75a6334..106acc0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -85,9 +85,9 @@ public class SparkExecutionContext extends ExecutionContext
        private static final boolean LDEBUG = false; //local debug flag
        
        //internal configurations 
-       private static boolean LAZY_SPARKCTX_CREATION = true;
-       private static boolean ASYNCHRONOUS_VAR_DESTROY = true;
-       private static boolean FAIR_SCHEDULER_MODE = true;
+       private static final boolean LAZY_SPARKCTX_CREATION = true;
+       private static final boolean ASYNCHRONOUS_VAR_DESTROY = true;
+       private static final boolean FAIR_SCHEDULER_MODE = true;
        
        //executor memory and relative fractions as obtained from the spark 
configuration
        private static SparkClusterConfig _sconf = null;
@@ -1152,12 +1152,12 @@ public class SparkExecutionContext extends 
ExecutionContext
         * 
         * @param bvar broadcast variable
         */
-       public void cleanupBroadcastVariable(Broadcast<?> bvar) 
+       public static void cleanupBroadcastVariable(Broadcast<?> bvar) 
        {
-               //in comparison to 'unpersist' (which would only delete the 
broadcast from the executors),
-               //this call also deletes related data from the driver.
+               //In comparison to 'unpersist' (which would only delete the 
broadcast 
+               //from the executors), this call also deletes related data from 
the driver.
                if( bvar.isValid() ) {
-                       bvar.destroy( ASYNCHRONOUS_VAR_DESTROY );
+                       bvar.destroy( !ASYNCHRONOUS_VAR_DESTROY );
                }
        }
        
@@ -1168,10 +1168,10 @@ public class SparkExecutionContext extends 
ExecutionContext
         * 
         * @param rvar rdd variable to remove
         */
-       public void cleanupRDDVariable(JavaPairRDD<?,?> rvar) 
+       public static void cleanupRDDVariable(JavaPairRDD<?,?> rvar) 
        {
                if( rvar.getStorageLevel()!=StorageLevel.NONE() ) {
-                       rvar.unpersist( ASYNCHRONOUS_VAR_DESTROY );
+                       rvar.unpersist( !ASYNCHRONOUS_VAR_DESTROY );
                }
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f380b52a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
index 98e99dd..1a7aeb3 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 
 /**
  * This class is a wrapper around an array of broadcasts of partitioned 
matrix/frame blocks,
@@ -101,4 +102,12 @@ public class PartitionedBroadcast<T extends CacheBlock> 
implements Serializable
                return ret;
        }
 
+       /**
+        * This method cleanups all underlying broadcasts of a partitioned 
broadcast,
+        * by forward the calls to 
SparkExecutionContext.cleanupBroadcastVariable.
+        */
+       public void destroy() {
+               for( Broadcast<PartitionedBlock<T>> bvar : _pbc )
+                       SparkExecutionContext.cleanupBroadcastVariable(bvar);
+       }
 }

Reply via email to