Repository: incubator-systemml Updated Branches: refs/heads/master 4c162afd9 -> a429e2df9
[SYSTEMML-1433] Fix unnecessarily blocking rdd and broadcast cleanup Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f380b52a Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f380b52a Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f380b52a Branch: refs/heads/master Commit: f380b52a9d041798b268de8007140b418af1e0db Parents: 4c162af Author: Matthias Boehm <mboe...@gmail.com> Authored: Wed Mar 22 21:27:00 2017 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Thu Mar 23 13:39:37 2017 -0700 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 18 +++++++++--------- .../spark/data/PartitionedBroadcast.java | 9 +++++++++ 2 files changed, 18 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f380b52a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 75a6334..106acc0 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -85,9 +85,9 @@ public class SparkExecutionContext extends ExecutionContext private static final boolean LDEBUG = false; //local debug flag //internal configurations - private static boolean LAZY_SPARKCTX_CREATION = true; - private static boolean ASYNCHRONOUS_VAR_DESTROY = true; - private static boolean FAIR_SCHEDULER_MODE = true; + private static final boolean LAZY_SPARKCTX_CREATION = true; + private static final boolean ASYNCHRONOUS_VAR_DESTROY = true; + private static final boolean FAIR_SCHEDULER_MODE = true; //executor memory and relative fractions as obtained from the spark configuration private static SparkClusterConfig _sconf = null; @@ -1152,12 +1152,12 @@ public class SparkExecutionContext extends ExecutionContext * * @param bvar broadcast variable */ - public void cleanupBroadcastVariable(Broadcast<?> bvar) + public static void cleanupBroadcastVariable(Broadcast<?> bvar) { - //in comparison to 'unpersist' (which would only delete the broadcast from the executors), - //this call also deletes related data from the driver. + //In comparison to 'unpersist' (which would only delete the broadcast + //from the executors), this call also deletes related data from the driver. if( bvar.isValid() ) { - bvar.destroy( ASYNCHRONOUS_VAR_DESTROY ); + bvar.destroy( !ASYNCHRONOUS_VAR_DESTROY ); } } @@ -1168,10 +1168,10 @@ public class SparkExecutionContext extends ExecutionContext * * @param rvar rdd variable to remove */ - public void cleanupRDDVariable(JavaPairRDD<?,?> rvar) + public static void cleanupRDDVariable(JavaPairRDD<?,?> rvar) { if( rvar.getStorageLevel()!=StorageLevel.NONE() ) { - rvar.unpersist( ASYNCHRONOUS_VAR_DESTROY ); + rvar.unpersist( !ASYNCHRONOUS_VAR_DESTROY ); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f380b52a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java index 98e99dd..1a7aeb3 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java @@ -24,6 +24,7 @@ import java.io.Serializable; import org.apache.spark.broadcast.Broadcast; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.CacheBlock; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; /** * This class is a wrapper around an array of broadcasts of partitioned matrix/frame blocks, @@ -101,4 +102,12 @@ public class PartitionedBroadcast<T extends CacheBlock> implements Serializable return ret; } + /** + * This method cleanups all underlying broadcasts of a partitioned broadcast, + * by forward the calls to SparkExecutionContext.cleanupBroadcastVariable. + */ + public void destroy() { + for( Broadcast<PartitionedBlock<T>> bvar : _pbc ) + SparkExecutionContext.cleanupBroadcastVariable(bvar); + } }