Repository: reef Updated Branches: refs/heads/master a105042a9 -> 45cae485e
[REEF-1627] Assign name and thread group to each thread that manages local containers on the Driver side JIRA: [REEF-1627](https://issues.apache.org/jira/browse/REEF-1627) Pull Request: This closes #1144 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/45cae485 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/45cae485 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/45cae485 Branch: refs/heads/master Commit: 45cae485ebfaf5c30d8847c50c2a8e13e9782f1d Parents: a105042 Author: Sergiy Matusevych <[email protected]> Authored: Fri Sep 30 13:11:27 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Fri Sep 30 18:17:24 2016 -0700 ---------------------------------------------------------------------- .../runtime/local/driver/ContainerManager.java | 4 +++- .../runtime/local/driver/ProcessContainer.java | 19 ++++++++++------ .../standalone/driver/RemoteNodeManager.java | 6 ++++- .../standalone/driver/SshProcessContainer.java | 24 +++++++++++--------- 4 files changed, 33 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/45cae485/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java index 3184fc5..fbf038e 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java @@ -62,6 +62,8 @@ final class ContainerManager implements AutoCloseable { private static final Collection<String> DEFAULT_RACKS = Collections.singletonList(RackNames.DEFAULT_RACK_NAME); + private final ThreadGroup containerThreads = new ThreadGroup("LocalContainerManagerThreadGroup"); + /** * Map from containerID -> Container. */ @@ -369,7 +371,7 @@ final class ContainerManager implements AutoCloseable { final ProcessContainer container = new ProcessContainer( this.errorHandlerRID, nodeId, processID, processFolder, megaBytes, - numberOfCores, rackName, this.fileNames, this.processObserver); + numberOfCores, rackName, this.fileNames, this.processObserver, this.containerThreads); this.containers.put(container.getContainerID(), container); LOG.log(Level.FINE, "Allocated {0}", container.getContainerID()); http://git-wip-us.apache.org/repos/asf/reef/blob/45cae485/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java index ec70502..160de05 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java @@ -51,10 +51,10 @@ public final class ProcessContainer implements Container { private final int numberOfCores; private final String rackName; private final REEFFileNames fileNames; - private final File reefFolder; private final File localFolder; private final File globalFolder; private final RunnableProcessObserver processObserver; + private final ThreadGroup threadGroup; private Thread theThread; private RunnableProcess process; @@ -72,7 +72,9 @@ public final class ProcessContainer implements Container { final int numberOfCores, final String rackName, final REEFFileNames fileNames, - final ReefRunnableProcessObserver processObserver) { + final ReefRunnableProcessObserver processObserver, + final ThreadGroup threadGroup) { + this.errorHandlerRID = errorHandlerRID; this.nodeID = nodeID; this.containedID = containedID; @@ -82,11 +84,15 @@ public final class ProcessContainer implements Container { this.rackName = rackName; this.fileNames = fileNames; this.processObserver = processObserver; - this.reefFolder = new File(folder, fileNames.getREEFFolderName()); + this.threadGroup = threadGroup; + + final File reefFolder = new File(folder, fileNames.getREEFFolderName()); + this.localFolder = new File(reefFolder, fileNames.getLocalFolderName()); if (!this.localFolder.exists() && !this.localFolder.mkdirs()) { LOG.log(Level.WARNING, "Failed to create [{0}]", this.localFolder.getAbsolutePath()); } + this.globalFolder = new File(reefFolder, fileNames.getGlobalFolderName()); if (!this.globalFolder.exists() && !this.globalFolder.mkdirs()) { LOG.log(Level.WARNING, "Failed to create [{0}]", this.globalFolder.getAbsolutePath()); @@ -115,10 +121,9 @@ public final class ProcessContainer implements Container { } @Override - @SuppressWarnings("checkstyle:hiddenfield") - public void addGlobalFiles(final File globalFolder) { + public void addGlobalFiles(final File globalFilesFolder) { try { - final File[] files = globalFolder.listFiles(); + final File[] files = globalFilesFolder.listFiles(); if (files != null) { copy(Arrays.asList(files), this.globalFolder); } @@ -135,7 +140,7 @@ public final class ProcessContainer implements Container { this.processObserver, this.fileNames.getEvaluatorStdoutFileName(), this.fileNames.getEvaluatorStderrFileName()); - this.theThread = new Thread(this.process); + this.theThread = new Thread(this.threadGroup, this.process, this.containedID); this.theThread.start(); } http://git-wip-us.apache.org/repos/asf/reef/blob/45cae485/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/RemoteNodeManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/RemoteNodeManager.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/RemoteNodeManager.java index a66353c..1a8a411 100644 --- a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/RemoteNodeManager.java +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/RemoteNodeManager.java @@ -54,8 +54,11 @@ import java.util.logging.Logger; * Management module for remote nodes in standalone runtime. */ public final class RemoteNodeManager { + private static final Logger LOG = Logger.getLogger(RemoteNodeManager.class.getName()); + private final ThreadGroup containerThreads = new ThreadGroup("SshContainerManagerThreadGroup"); + /** * Map from containerID -> SshProcessContainer. */ @@ -248,7 +251,8 @@ public final class RemoteNodeManager { final SshProcessContainer sshProcessContainer = new SshProcessContainer(errorHandlerRID, nodeId, processID, processFolder, resourceRequestEvent.getMemorySize().get(), resourceRequestEvent.getVirtualCores().get(), - null, this.fileNames, nodeFolder, processObserver); + null, this.fileNames, this.nodeFolder, this.processObserver, this.containerThreads); + this.containers.put(processID, sshProcessContainer); final ResourceAllocationEvent alloc = ResourceEventImpl.newAllocationBuilder() http://git-wip-us.apache.org/repos/asf/reef/blob/45cae485/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/SshProcessContainer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/SshProcessContainer.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/SshProcessContainer.java index c2c5501..479ce99 100644 --- a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/SshProcessContainer.java +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/SshProcessContainer.java @@ -47,6 +47,7 @@ import java.util.logging.Logger; @Private @TaskSide final class SshProcessContainer implements Container { + private static final Logger LOG = Logger.getLogger(SshProcessContainer.class.getName()); private final String errorHandlerRID; @@ -57,9 +58,9 @@ final class SshProcessContainer implements Container { private final int numberOfCores; private final String rackName; private final REEFFileNames fileNames; - private final File reefFolder; private final File localFolder; private final File globalFolder; + private final ThreadGroup threadGroup; private Thread theThread; private final ReefRunnableProcessObserver processObserver; private RunnableProcess process; @@ -83,7 +84,9 @@ final class SshProcessContainer implements Container { final String rackName, final REEFFileNames fileNames, final String nodeFolder, - final ReefRunnableProcessObserver processObserver) { + final ReefRunnableProcessObserver processObserver, + final ThreadGroup threadGroup) { + this.errorHandlerRID = errorHandlerRID; this.processObserver = processObserver; this.nodeID = nodeID; @@ -94,11 +97,15 @@ final class SshProcessContainer implements Container { this.rackName = rackName; this.fileNames = fileNames; this.nodeFolder = nodeFolder; - this.reefFolder = new File(folder, fileNames.getREEFFolderName()); + this.threadGroup = threadGroup; + + final File reefFolder = new File(folder, fileNames.getREEFFolderName()); + this.localFolder = new File(reefFolder, fileNames.getLocalFolderName()); if (!this.localFolder.exists() && !this.localFolder.mkdirs()) { LOG.log(Level.WARNING, "Failed to create [{0}]", this.localFolder.getAbsolutePath()); } + this.globalFolder = new File(reefFolder, fileNames.getGlobalFolderName()); if (!this.globalFolder.exists() && !this.globalFolder.mkdirs()) { LOG.log(Level.WARNING, "Failed to create [{0}]", this.globalFolder.getAbsolutePath()); @@ -113,7 +120,7 @@ final class SshProcessContainer implements Container { this.processObserver, this.fileNames.getEvaluatorStdoutFileName(), this.fileNames.getEvaluatorStderrFileName()); - this.theThread = new Thread(this.process); + this.theThread = new Thread(this.threadGroup, this.process, this.containedID); this.theThread.start(); } @@ -127,10 +134,9 @@ final class SshProcessContainer implements Container { } @Override - @SuppressWarnings("checkstyle:hiddenfield") - public void addGlobalFiles(final File globalFolder) { + public void addGlobalFiles(final File globalFilesFolder) { try { - final File[] files = globalFolder.listFiles(); + final File[] files = globalFilesFolder.listFiles(); if (files != null) { copy(Arrays.asList(files), this.globalFolder); } @@ -203,7 +209,6 @@ final class SshProcessContainer implements Container { ", folder=" + folder + '\'' + ", rack=" + rackName + "}"; - } SshProcessContainer withRemoteConnection(final Session newRemoteSession, final String newRemoteHostName) { @@ -255,7 +260,4 @@ final class SshProcessContainer implements Container { this.remoteHostName + " with the pwd command", ex); } } - - - }
