Repository: systemml Updated Branches: refs/heads/master a0b0e80e9 -> 159522a1f
[SYSTEMML-2238] Reuse of local parfor fair scheduler pool names This patch fixes a leak of fair scheduler pools in scripts with repeatedly executed or simply many parfor loops by reusing scheduler pool names. So far, we mistakenly assumed that disassociated pools are automatically cleaned up by Spark. The set of names is initially allocated in the number of vcores but scheduler pools are only allocated on demand by parfor workers if in spark execution mode and if there are large operations in the parfor body. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/93506b23 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/93506b23 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/93506b23 Branch: refs/heads/master Commit: 93506b2317a520d3f816b5d71d25d10a73374bef Parents: a0b0e80 Author: Matthias Boehm <mboe...@gmail.com> Authored: Sun Apr 8 12:49:37 2018 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Sun Apr 8 12:49:37 2018 -0700 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 53 ++++++--- .../controlprogram/parfor/LocalParWorker.java | 114 +++++++++---------- 2 files changed, 89 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/93506b23/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 98c3eaa..325a359 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.LongStream; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.LongWritable; @@ -92,10 +93,9 @@ public class SparkExecutionContext extends ExecutionContext private static final boolean LDEBUG = false; //local debug flag //internal configurations - private static boolean LAZY_SPARKCTX_CREATION = true; - private static boolean ASYNCHRONOUS_VAR_DESTROY = true; - - public static boolean FAIR_SCHEDULER_MODE = true; + private static final boolean LAZY_SPARKCTX_CREATION = true; + private static final boolean ASYNCHRONOUS_VAR_DESTROY = true; + public static final boolean FAIR_SCHEDULER_MODE = true; //executor memory and relative fractions as obtained from the spark configuration private static SparkClusterConfig _sconf = null; @@ -107,8 +107,12 @@ public class SparkExecutionContext extends ExecutionContext //10% of JVM max heap size for parallelized RDDs; if this is not sufficient, //matrices or frames are exported to HDFS and the RDDs are created from files. //TODO unify memory management for CP, par RDDs, and potentially broadcasts - private static MemoryManagerParRDDs _parRDDs = new MemoryManagerParRDDs(0.1); - + private static final MemoryManagerParRDDs _parRDDs = new MemoryManagerParRDDs(0.1); + + //pool of reused fair scheduler pool names (unset bits indicate availability) + private static boolean[] _poolBuff = FAIR_SCHEDULER_MODE ? + new boolean[InfrastructureAnalyzer.getLocalParallelism()] : null; + static { // for internal debugging only if( LDEBUG ) { @@ -117,11 +121,10 @@ public class SparkExecutionContext extends ExecutionContext } } - protected SparkExecutionContext(boolean allocateVars, Program prog) - { + protected SparkExecutionContext(boolean allocateVars, Program prog) { //protected constructor to force use of ExecutionContextFactory super( allocateVars, prog ); - + //spark context creation via internal initializer if( !LAZY_SPARKCTX_CREATION || DMLScript.rtplatform==RUNTIME_PLATFORM.SPARK ) { initSparkContext(); @@ -134,8 +137,7 @@ public class SparkExecutionContext extends ExecutionContext * * @return java spark context */ - public JavaSparkContext getSparkContext() - { + public JavaSparkContext getSparkContext() { //lazy spark context creation on demand (lazy instead of asynchronous //to avoid wait for uninitialized spark context on close) if( LAZY_SPARKCTX_CREATION ) { @@ -1232,19 +1234,40 @@ public class SparkExecutionContext extends ExecutionContext in.count(); //trigger caching to prevent contention } - public void setThreadLocalSchedulerPool(String poolName) { + public int setThreadLocalSchedulerPool() { + int pool = -1; if( FAIR_SCHEDULER_MODE ) { + pool = allocSchedulerPoolName(); getSparkContext().sc().setLocalProperty( - "spark.scheduler.pool", poolName); + "spark.scheduler.pool", "parforPool"+pool); } + return pool; } - public void cleanupThreadLocalSchedulerPool() { + public void cleanupThreadLocalSchedulerPool(int pool) { if( FAIR_SCHEDULER_MODE ) { + freeSchedulerPoolName(pool); getSparkContext().sc().setLocalProperty( - "spark.scheduler.pool", null); + "spark.scheduler.pool", null); } } + + private static synchronized int allocSchedulerPoolName() { + int pool = ArrayUtils.indexOf(_poolBuff, false); + //grow pool on demand + if( pool < 0 ) { + pool = _poolBuff.length; + _poolBuff = Arrays.copyOf(_poolBuff, + (int)Math.min(2L*pool, Integer.MAX_VALUE)); + } + //mark pool name for in use + _poolBuff[pool] = true; + return pool; + } + + private static synchronized void freeSchedulerPoolName(int pool) { + _poolBuff[pool] = false; + } private boolean isRDDMarkedForCaching( int rddID ) { JavaSparkContext jsc = getSparkContext(); http://git-wip-us.apache.org/repos/asf/systemml/blob/93506b23/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/LocalParWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/LocalParWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/LocalParWorker.java index f77c22e..058026c 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/LocalParWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/LocalParWorker.java @@ -41,20 +41,15 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; */ public class LocalParWorker extends ParWorker implements Runnable { - protected LocalTaskQueue<Task> _taskQueue = null; - + protected final LocalTaskQueue<Task> _taskQueue; + protected final CompilerConfig _cconf; + protected final boolean _stopped; + protected final int _max_retry; protected Collection<String> _fnNames = null; - protected CompilerConfig _cconf = null; - protected boolean _stopped = false; - protected int _max_retry = -1; - - public LocalParWorker( long ID, LocalTaskQueue<Task> q, ParForBody body, CompilerConfig cconf, int max_retry, boolean monitor ) - { + public LocalParWorker( long ID, LocalTaskQueue<Task> q, ParForBody body, CompilerConfig cconf, int max_retry, boolean monitor ) { super(ID, body, monitor); - _taskQueue = q; - _cconf = cconf; _stopped = false; _max_retry = max_retry; @@ -76,10 +71,11 @@ public class LocalParWorker extends ParWorker implements Runnable //setup fair scheduler pool for worker thread, but avoid unnecessary //spark context creation (if data cached already created) + int pool = -1; if( OptimizerUtils.isSparkExecutionMode() && SparkExecutionContext.isSparkContextCreated() ) { SparkExecutionContext sec = (SparkExecutionContext)_ec; - sec.setThreadLocalSchedulerPool("parforPool"+_workerID); + pool = sec.setThreadLocalSchedulerPool(); } // Initialize this GPUContext to this thread @@ -98,60 +94,54 @@ public class LocalParWorker extends ParWorker implements Runnable // continuous execution (execute tasks until (1) stopped or (2) no more tasks) Task lTask = null; - - while( !_stopped ) - { - //dequeue the next task (abort on NO_MORE_TASKS or error) - try - { - lTask = _taskQueue.dequeueTask(); - - if( lTask == LocalTaskQueue.NO_MORE_TASKS ) // task queue closed (no more tasks) - break; //normal end of parallel worker - } - catch(Exception ex) - { - // abort on taskqueue error - LOG.warn("Error reading from task queue: "+ex.getMessage()); - LOG.warn("Stopping LocalParWorker."); - break; //no exception thrown to prevent blocking on join - } - - //execute the task sequentially (re-try on error) - boolean success = false; - int retrys = _max_retry; - - while( !success ) - { - try - { - /////// - //core execution (see ParWorker) - executeTask( lTask ); - success = true; - } - catch (Exception ex) - { - LOG.error("Failed to execute "+lTask.toString()+", retry:"+retrys, ex); + try { + while( !_stopped ) { + //dequeue the next task (abort on NO_MORE_TASKS or error) + try { + lTask = _taskQueue.dequeueTask(); - if( retrys > 0 ) - retrys--; //retry on task error - else - { - // abort on no remaining retrys - LOG.error("Error executing task: ",ex); - LOG.error("Stopping LocalParWorker."); - break; //no exception thrown to prevent blocking on join + if( lTask == LocalTaskQueue.NO_MORE_TASKS ) // task queue closed (no more tasks) + break; //normal end of parallel worker + } + catch(Exception ex) { + // abort on taskqueue error + LOG.warn("Error reading from task queue: "+ex.getMessage()); + LOG.warn("Stopping LocalParWorker."); + break; //no exception thrown to prevent blocking on join + } + + //execute the task sequentially (re-try on error) + boolean success = false; + int retrys = _max_retry; + + while( !success ) { + try { + /////// + //core execution (see ParWorker) + executeTask( lTask ); + success = true; + } + catch (Exception ex) { + LOG.error("Failed to execute "+lTask.toString()+", retry:"+retrys, ex); + + if( retrys > 0 ) + retrys--; //retry on task error + else { + // abort on no remaining retrys + LOG.error("Error executing task: ",ex); + LOG.error("Stopping LocalParWorker."); + break; //no exception thrown to prevent blocking on join + } } } } - } - - //setup fair scheduler pool for worker thread - if( OptimizerUtils.isSparkExecutionMode() - && SparkExecutionContext.isSparkContextCreated() ) { - SparkExecutionContext sec = (SparkExecutionContext)_ec; - sec.cleanupThreadLocalSchedulerPool(); + } + finally { + //cleanup fair scheduler pool for worker thread + if( OptimizerUtils.isSparkExecutionMode() ) { + SparkExecutionContext sec = (SparkExecutionContext)_ec; + sec.cleanupThreadLocalSchedulerPool(pool); + } } if( _monitor ) { @@ -161,5 +151,3 @@ public class LocalParWorker extends ParWorker implements Runnable } } } - -