Repository: reef
Updated Branches:
  refs/heads/master a105042a9 -> 45cae485e


[REEF-1627] Assign name and thread group to each thread that manages local 
containers on the Driver side

JIRA:
  [REEF-1627](https://issues.apache.org/jira/browse/REEF-1627)

Pull Request:
  This closes #1144


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/45cae485
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/45cae485
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/45cae485

Branch: refs/heads/master
Commit: 45cae485ebfaf5c30d8847c50c2a8e13e9782f1d
Parents: a105042
Author: Sergiy Matusevych <[email protected]>
Authored: Fri Sep 30 13:11:27 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Sep 30 18:17:24 2016 -0700

----------------------------------------------------------------------
 .../runtime/local/driver/ContainerManager.java  |  4 +++-
 .../runtime/local/driver/ProcessContainer.java  | 19 ++++++++++------
 .../standalone/driver/RemoteNodeManager.java    |  6 ++++-
 .../standalone/driver/SshProcessContainer.java  | 24 +++++++++++---------
 4 files changed, 33 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/45cae485/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
index 3184fc5..fbf038e 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
@@ -62,6 +62,8 @@ final class ContainerManager implements AutoCloseable {
 
   private static final Collection<String> DEFAULT_RACKS = 
Collections.singletonList(RackNames.DEFAULT_RACK_NAME);
 
+  private final ThreadGroup containerThreads = new 
ThreadGroup("LocalContainerManagerThreadGroup");
+
   /**
    * Map from containerID -> Container.
    */
@@ -369,7 +371,7 @@ final class ContainerManager implements AutoCloseable {
 
     final ProcessContainer container = new ProcessContainer(
         this.errorHandlerRID, nodeId, processID, processFolder, megaBytes,
-        numberOfCores, rackName, this.fileNames, this.processObserver);
+        numberOfCores, rackName, this.fileNames, this.processObserver, 
this.containerThreads);
 
     this.containers.put(container.getContainerID(), container);
     LOG.log(Level.FINE, "Allocated {0}", container.getContainerID());

http://git-wip-us.apache.org/repos/asf/reef/blob/45cae485/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
index ec70502..160de05 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
@@ -51,10 +51,10 @@ public final class ProcessContainer implements Container {
   private final int numberOfCores;
   private final String rackName;
   private final REEFFileNames fileNames;
-  private final File reefFolder;
   private final File localFolder;
   private final File globalFolder;
   private final RunnableProcessObserver processObserver;
+  private final ThreadGroup threadGroup;
   private Thread theThread;
   private RunnableProcess process;
 
@@ -72,7 +72,9 @@ public final class ProcessContainer implements Container {
                    final int numberOfCores,
                    final String rackName,
                    final REEFFileNames fileNames,
-                   final ReefRunnableProcessObserver processObserver) {
+                   final ReefRunnableProcessObserver processObserver,
+                   final ThreadGroup threadGroup) {
+
     this.errorHandlerRID = errorHandlerRID;
     this.nodeID = nodeID;
     this.containedID = containedID;
@@ -82,11 +84,15 @@ public final class ProcessContainer implements Container {
     this.rackName = rackName;
     this.fileNames = fileNames;
     this.processObserver = processObserver;
-    this.reefFolder = new File(folder, fileNames.getREEFFolderName());
+    this.threadGroup = threadGroup;
+
+    final File reefFolder = new File(folder, fileNames.getREEFFolderName());
+
     this.localFolder = new File(reefFolder, fileNames.getLocalFolderName());
     if (!this.localFolder.exists() && !this.localFolder.mkdirs()) {
       LOG.log(Level.WARNING, "Failed to create [{0}]", 
this.localFolder.getAbsolutePath());
     }
+
     this.globalFolder = new File(reefFolder, fileNames.getGlobalFolderName());
     if (!this.globalFolder.exists() && !this.globalFolder.mkdirs()) {
       LOG.log(Level.WARNING, "Failed to create [{0}]", 
this.globalFolder.getAbsolutePath());
@@ -115,10 +121,9 @@ public final class ProcessContainer implements Container {
   }
 
   @Override
-  @SuppressWarnings("checkstyle:hiddenfield")
-  public void addGlobalFiles(final File globalFolder) {
+  public void addGlobalFiles(final File globalFilesFolder) {
     try {
-      final File[] files = globalFolder.listFiles();
+      final File[] files = globalFilesFolder.listFiles();
       if (files != null) {
         copy(Arrays.asList(files), this.globalFolder);
       }
@@ -135,7 +140,7 @@ public final class ProcessContainer implements Container {
         this.processObserver,
         this.fileNames.getEvaluatorStdoutFileName(),
         this.fileNames.getEvaluatorStderrFileName());
-    this.theThread = new Thread(this.process);
+    this.theThread = new Thread(this.threadGroup, this.process, 
this.containedID);
     this.theThread.start();
   }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/45cae485/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/RemoteNodeManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/RemoteNodeManager.java
 
b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/RemoteNodeManager.java
index a66353c..1a8a411 100644
--- 
a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/RemoteNodeManager.java
+++ 
b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/RemoteNodeManager.java
@@ -54,8 +54,11 @@ import java.util.logging.Logger;
  * Management module for remote nodes in standalone runtime.
  */
 public final class RemoteNodeManager {
+
   private static final Logger LOG = 
Logger.getLogger(RemoteNodeManager.class.getName());
 
+  private final ThreadGroup containerThreads = new 
ThreadGroup("SshContainerManagerThreadGroup");
+
   /**
    * Map from containerID -> SshProcessContainer.
    */
@@ -248,7 +251,8 @@ public final class RemoteNodeManager {
 
     final SshProcessContainer sshProcessContainer = new 
SshProcessContainer(errorHandlerRID, nodeId, processID,
         processFolder, resourceRequestEvent.getMemorySize().get(), 
resourceRequestEvent.getVirtualCores().get(),
-        null, this.fileNames, nodeFolder, processObserver);
+        null, this.fileNames, this.nodeFolder, this.processObserver, 
this.containerThreads);
+
     this.containers.put(processID, sshProcessContainer);
 
     final ResourceAllocationEvent alloc = 
ResourceEventImpl.newAllocationBuilder()

http://git-wip-us.apache.org/repos/asf/reef/blob/45cae485/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/SshProcessContainer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/SshProcessContainer.java
 
b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/SshProcessContainer.java
index c2c5501..479ce99 100644
--- 
a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/SshProcessContainer.java
+++ 
b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/SshProcessContainer.java
@@ -47,6 +47,7 @@ import java.util.logging.Logger;
 @Private
 @TaskSide
 final class SshProcessContainer implements Container {
+
   private static final Logger LOG = 
Logger.getLogger(SshProcessContainer.class.getName());
 
   private final String errorHandlerRID;
@@ -57,9 +58,9 @@ final class SshProcessContainer implements Container {
   private final int numberOfCores;
   private final String rackName;
   private final REEFFileNames fileNames;
-  private final File reefFolder;
   private final File localFolder;
   private final File globalFolder;
+  private final ThreadGroup threadGroup;
   private Thread theThread;
   private final ReefRunnableProcessObserver processObserver;
   private RunnableProcess process;
@@ -83,7 +84,9 @@ final class SshProcessContainer implements Container {
                       final String rackName,
                       final REEFFileNames fileNames,
                       final String nodeFolder,
-                      final ReefRunnableProcessObserver processObserver) {
+                      final ReefRunnableProcessObserver processObserver,
+                      final ThreadGroup threadGroup) {
+
     this.errorHandlerRID = errorHandlerRID;
     this.processObserver = processObserver;
     this.nodeID = nodeID;
@@ -94,11 +97,15 @@ final class SshProcessContainer implements Container {
     this.rackName = rackName;
     this.fileNames = fileNames;
     this.nodeFolder = nodeFolder;
-    this.reefFolder = new File(folder, fileNames.getREEFFolderName());
+    this.threadGroup = threadGroup;
+
+    final File reefFolder = new File(folder, fileNames.getREEFFolderName());
+
     this.localFolder = new File(reefFolder, fileNames.getLocalFolderName());
     if (!this.localFolder.exists() && !this.localFolder.mkdirs()) {
       LOG.log(Level.WARNING, "Failed to create [{0}]", 
this.localFolder.getAbsolutePath());
     }
+
     this.globalFolder = new File(reefFolder, fileNames.getGlobalFolderName());
     if (!this.globalFolder.exists() && !this.globalFolder.mkdirs()) {
       LOG.log(Level.WARNING, "Failed to create [{0}]", 
this.globalFolder.getAbsolutePath());
@@ -113,7 +120,7 @@ final class SshProcessContainer implements Container {
         this.processObserver,
         this.fileNames.getEvaluatorStdoutFileName(),
         this.fileNames.getEvaluatorStderrFileName());
-    this.theThread = new Thread(this.process);
+    this.theThread = new Thread(this.threadGroup, this.process, 
this.containedID);
     this.theThread.start();
   }
 
@@ -127,10 +134,9 @@ final class SshProcessContainer implements Container {
   }
 
   @Override
-  @SuppressWarnings("checkstyle:hiddenfield")
-  public void addGlobalFiles(final File globalFolder) {
+  public void addGlobalFiles(final File globalFilesFolder) {
     try {
-      final File[] files = globalFolder.listFiles();
+      final File[] files = globalFilesFolder.listFiles();
       if (files != null) {
         copy(Arrays.asList(files), this.globalFolder);
       }
@@ -203,7 +209,6 @@ final class SshProcessContainer implements Container {
         ", folder=" + folder + '\'' +
         ", rack=" + rackName +
         "}";
-
   }
 
   SshProcessContainer withRemoteConnection(final Session newRemoteSession, 
final String newRemoteHostName) {
@@ -255,7 +260,4 @@ final class SshProcessContainer implements Container {
           this.remoteHostName + " with the pwd command", ex);
     }
   }
-
-
-
 }

Reply via email to