Repository: systemml
Updated Branches:
  refs/heads/master 12ad5538a -> c10e509a7


[SYSTEMML-2380] Fix paramserv shutdown of agg service thread pool

Closes #782.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/c10e509a
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/c10e509a
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/c10e509a

Branch: refs/heads/master
Commit: c10e509a78232f8cacfa9c7485395792d6af24e8
Parents: 12ad553
Author: EdgarLGB <[email protected]>
Authored: Mon Jun 11 12:58:26 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Mon Jun 11 12:58:27 2018 -0700

----------------------------------------------------------------------
 .../controlprogram/paramserv/ParamServer.java     |  6 +++++-
 .../cp/ParamservBuiltinCPInstruction.java         | 18 +++++++++++-------
 2 files changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/c10e509a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamServer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamServer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamServer.java
index 09b760f..7a39bec 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamServer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamServer.java
@@ -34,6 +34,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysml.parser.DMLProgram;
@@ -71,7 +72,10 @@ public abstract class ParamServer {
                catch (InterruptedException e) {
                        throw new DMLRuntimeException("Param server: failed to 
broadcast the initial model.", e);
                }
-               _es = Executors.newSingleThreadExecutor();
+               BasicThreadFactory factory = new BasicThreadFactory.Builder()
+                       .namingPattern("agg-service-pool-thread-%d")
+                       .build();
+               _es = Executors.newSingleThreadExecutor(factory);
        }
 
        public abstract void push(int workerID, ListObject value);

http://git-wip-us.apache.org/repos/asf/systemml/blob/c10e509a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
index fd5bc56..8fbc7cc 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
@@ -51,6 +51,7 @@ import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.log4j.Level;
@@ -111,7 +112,10 @@ public class ParamservBuiltinCPInstruction extends 
ParameterizedBuiltinCPInstruc
        public void processInstruction(ExecutionContext ec) {
                PSModeType mode = getPSMode();
                int workerNum = getWorkerNum(mode);
-               ExecutorService es = Executors.newFixedThreadPool(workerNum);
+               BasicThreadFactory factory = new BasicThreadFactory.Builder()
+                       .namingPattern("workers-pool-thread-%d")
+                       .build();
+               ExecutorService es = Executors.newFixedThreadPool(workerNum, 
factory);
                String updFunc = getParam(PS_UPDATE_FUN);
                String aggFunc = getParam(PS_AGGREGATION_FUN);
 
@@ -148,20 +152,20 @@ public class ParamservBuiltinCPInstruction extends 
ParameterizedBuiltinCPInstruc
                                mode, workerNum, freq, updateType, scheme));
                }
 
-               // Launch the worker threads and wait for completion
                try {
+                       // Launch the worker threads and wait for completion
                        for (Future<Void> ret : es.invokeAll(workers))
                                ret.get(); //error handling
+                       // Fetch the final model from ps
+                       ListObject result = ps.getResult();
+                       ec.setVariable(output.getName(), result);
                } catch (InterruptedException | ExecutionException e) {
                        throw new 
DMLRuntimeException("ParamservBuiltinCPInstruction: some error occurred: ", e);
                } finally {
                        es.shutdownNow();
+                       // Should shutdown the thread pool in param server
+                       ps.shutdown();
                }
-
-               // Fetch the final model from ps
-               ListObject result;
-               result = ps.getResult();
-               ec.setVariable(output.getName(), result);
        }
 
        private PSModeType getPSMode() {

Reply via email to