Repository: systemml Updated Branches: refs/heads/master 12ad5538a -> c10e509a7
[SYSTEMML-2380] Fix paramserv shutdown of agg service thread pool Closes #782. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/c10e509a Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/c10e509a Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/c10e509a Branch: refs/heads/master Commit: c10e509a78232f8cacfa9c7485395792d6af24e8 Parents: 12ad553 Author: EdgarLGB <[email protected]> Authored: Mon Jun 11 12:58:26 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Mon Jun 11 12:58:27 2018 -0700 ---------------------------------------------------------------------- .../controlprogram/paramserv/ParamServer.java | 6 +++++- .../cp/ParamservBuiltinCPInstruction.java | 18 +++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/c10e509a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamServer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamServer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamServer.java index 09b760f..7a39bec 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamServer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamServer.java @@ -34,6 +34,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sysml.parser.DMLProgram; @@ -71,7 +72,10 @@ public abstract class ParamServer { catch (InterruptedException e) { throw new DMLRuntimeException("Param server: failed to broadcast the initial model.", e); } - _es = Executors.newSingleThreadExecutor(); + BasicThreadFactory factory = new BasicThreadFactory.Builder() + .namingPattern("agg-service-pool-thread-%d") + .build(); + _es = Executors.newSingleThreadExecutor(factory); } public abstract void push(int workerID, ListObject value); http://git-wip-us.apache.org/repos/asf/systemml/blob/c10e509a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java index fd5bc56..8fbc7cc 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java @@ -51,6 +51,7 @@ import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.Level; @@ -111,7 +112,10 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc public void processInstruction(ExecutionContext ec) { PSModeType mode = getPSMode(); int workerNum = getWorkerNum(mode); - ExecutorService es = Executors.newFixedThreadPool(workerNum); + BasicThreadFactory factory = new BasicThreadFactory.Builder() + .namingPattern("workers-pool-thread-%d") + .build(); + ExecutorService es = Executors.newFixedThreadPool(workerNum, factory); String updFunc = getParam(PS_UPDATE_FUN); String aggFunc = getParam(PS_AGGREGATION_FUN); @@ -148,20 +152,20 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc mode, workerNum, freq, updateType, scheme)); } - // Launch the worker threads and wait for completion try { + // Launch the worker threads and wait for completion for (Future<Void> ret : es.invokeAll(workers)) ret.get(); //error handling + // Fetch the final model from ps + ListObject result = ps.getResult(); + ec.setVariable(output.getName(), result); } catch (InterruptedException | ExecutionException e) { throw new DMLRuntimeException("ParamservBuiltinCPInstruction: some error occurred: ", e); } finally { es.shutdownNow(); + // Should shutdown the thread pool in param server + ps.shutdown(); } - - // Fetch the final model from ps - ListObject result; - result = ps.getResult(); - ec.setVariable(output.getName(), result); } private PSModeType getPSMode() {
