Author: cwiklik
Date: Thu Apr 18 19:41:46 2013
New Revision: 1469537

URL: http://svn.apache.org/r1469537
Log:
UIMA-2804 Modified to perform cgroup cleanup on agent startup, correctly report 
major swap faults

Modified:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java?rev=1469537&r1=1469536&r2=1469537&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
 Thu Apr 18 19:41:46 2013
@@ -139,7 +139,7 @@ public class NodeAgent extends AbstractD
 
   public int shareQuantum;
   
-  public boolean virtualAgent = true;
+  public boolean virtualAgent = false;
   
   /**
    * Ctor used exclusively for black-box testing of this class.
@@ -194,7 +194,10 @@ public class NodeAgent extends AbstractD
                        exclusionParser.parse(exclusionFile);
                        excludeNodeFromCGroups = 
exclusionParser.cgroupsExcluded();
                        excludeAPs = exclusionParser.apExcluded();
-                       
+                       if ( excludeNodeFromCGroups ) {
+                               logger.info("nodeAgent", null,
+                            "------- Node Explicitly Excluded From Using 
CGroups. Check File:"+exclusionFile);
+                       }
                        
System.out.println("excludeNodeFromCGroups="+excludeNodeFromCGroups+" 
excludeAPs="+excludeAPs);
                 } else {
                         System.out.println("Running with No exclusion File");
@@ -220,6 +223,13 @@ public class NodeAgent extends AbstractD
                        useCgroups = true;
                        logger.info("nodeAgent", null,
                             "------- Agent Running with CGroups Enabled");
+                       try {
+                               // remove stale CGroups
+                               cgroupsManager.cleanupOnStartup();
+                       } catch( Exception e) {
+                               logger.error("nodeAgent", null,e);
+                                
+                       }
                        
                        } else {
                                logger.info("nodeAgent", null,

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java?rev=1469537&r1=1469536&r2=1469537&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
 Thu Apr 18 19:41:46 2013
@@ -2,9 +2,16 @@ package org.apache.uima.ducc.agent.launc
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileReader;
+import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.Utils;
@@ -12,110 +19,307 @@ import org.apache.uima.ducc.common.utils
 /**
  * Manages cgroup container on a node
  * 
- * Supported operations:
- *   - cgcreate - creates cgroup container
- *   - cgset - sets max memory limit for an existing container
- *   
- *
+ * Supported operations: - cgcreate - creates cgroup container - cgset - sets
+ * max memory limit for an existing container
+ * 
+ * 
  */
 public class CGroupsManager {
        private DuccLogger agentLogger = null;
-       
+
        private Set<String> containerIds = new LinkedHashSet<String>();
        private String cgroupBaseDir = "";
-       private String cgroupSubsystems = "";  // comma separated list of 
subsystems eg. memory,cpu
+       private String cgroupSubsystems = ""; // comma separated list of 
subsystems
+                                                                               
        // eg. memory,cpu
+
        /**
         * @param args
         */
        public static void main(String[] args) {
                try {
-                       
-                       CGroupsManager cgMgr = new 
CGroupsManager("/cgroup/ducc", "memory", null);
-                       System.out.println("Cgroups 
Installed:"+cgMgr.cgroupExists("/cgroup/ducc"));
+
+                       CGroupsManager cgMgr = new 
CGroupsManager("/cgroup/ducc", "memory",
+                                       null);
+                       System.out.println("Cgroups Installed:"
+                                       + cgMgr.cgroupExists("/cgroup/ducc"));
                        Set<String> containers = 
cgMgr.collectExistingContainers();
-                       for ( String containerId : containers ) {
-                               System.out.println("Existing CGroup Container 
ID:"+containerId);
+                       for (String containerId : containers) {
+                               System.out.println("Existing CGroup Container 
ID:"
+                                               + containerId);
                        }
                        cgMgr.createContainer(args[0], args[2], true);
-                       cgMgr.setContainerMaxMemoryLimit(args[0], args[2], 
true, Long.parseLong(args[1]));
-                   synchronized( cgMgr ) {
-                       cgMgr.wait(60000);
-                   }
-                   cgMgr.destroyContainer(args[0]);
-                   
-               } catch( Exception e) {
+                       cgMgr.setContainerMaxMemoryLimit(args[0], args[2], true,
+                                       Long.parseLong(args[1]));
+                       synchronized (cgMgr) {
+                               cgMgr.wait(60000);
+                       }
+                       cgMgr.destroyContainer(args[0]);
+
+               } catch (Exception e) {
                        e.printStackTrace();
                }
        }
-       
-       public CGroupsManager(String cgroupBaseDir, String cgroupSubsystems, 
DuccLogger agentLogger ) {
+
+       public CGroupsManager(String cgroupBaseDir, String cgroupSubsystems,
+                       DuccLogger agentLogger) {
                this.cgroupBaseDir = cgroupBaseDir;
                this.cgroupSubsystems = cgroupSubsystems;
                this.agentLogger = agentLogger;
        }
-    /**
-     * Creates cgroup container with a given id and owner.
-     * 
-     * @param containerId - new cgroup container id
-     * @param userId - owner of the cgroup container
-     * @param useDuccSpawn - use duccling to run 'cgcreate' command
-     * 
-     * @return - true on success, false otherwise
-     * 
-     * @throws Exception
-     */
-       public boolean createContainer(String containerId, String userId, 
boolean useDuccSpawn ) throws Exception {
+
+       private String[] readPids(File f) throws Exception {
+               List<String> pids = new ArrayList<String>();
+               BufferedReader br = new BufferedReader(new FileReader(f));
+               String line;
+               while ((line = br.readLine()) != null) {
+                       pids.add(line.trim());
+               }
+               br.close();
+               return pids.toArray(new String[pids.size()]);
+       }
+       /**
+        * Finds all stale CGroups in /cgroup/ducc folder and cleans them
+        * up. The code only cleans up cgroups folders with names that follow
+        * ducc's cgroup naming convention: <id>.<id>.<id>.
+        * First, each cgroup is checked for still running processes in the
+        * cgroup by looking at /cgroup/ducc/<id>/cgroup.proc file which
+        * includes PIDs of processes associated with the cgroups. If 
+        * processes are found, each one is killed via -9 and the cgroup
+        * is removed.
+        * 
+        * @throws Exception
+        */
+       public void cleanupOnStartup() throws Exception {
+
+               Set<NodeProcessInfo> processes = getProcessesOnNode();
+               // Match any folder under /cgroup/ducc that has syntax
+               // <number>.<number>.<number>
+               // This syntax is assigned by ducc to each cgroup
+               Pattern p = Pattern.compile("((\\d+)\\.(\\d+)\\.(\\d+))");
+
+               File cgroupsFolder = new File(cgroupBaseDir);
+               String[] files = cgroupsFolder.list();
                
+               for (String cgroupFolder : files) {
+                       Matcher m = p.matcher(cgroupFolder);
+                       //      only look at ducc's cgroups
+                       if (m.find()) {
+                               try {
+                                       // open proc file which may include 
PIDs if processes are 
+                                       // still running
+                                       File f = new File(cgroupBaseDir + "/" + 
cgroupFolder
+                                                       + "/cgroup.procs");
+                                       //      collect all pids
+                                       String[] pids = readPids(f);
+                                       // kill each runnig process via -9
+                                       if (pids != null && pids.length > 0) {
+                                               for (String pid : pids) {
+                                                       // Got cgroup processes 
still running. Kill them
+                                                       for (NodeProcessInfo 
proc : processes) {
+                                                               if 
(proc.getPid().equals(pid)) {
+                                                                       
+                                                                       kill( 
proc.getUserid(), proc.getPid());
+                                                               }
+                                                       }
+                                               }
+                                               // it may take some time for 
the cgroups to udate accounting. Just cycle until
+                                               // the procs file becomes empty 
under a given cgroup
+                                               while( true ) {
+                                                       pids = readPids(f);
+                                                       if ( pids == null || 
pids.length == 0) {
+                                                               break;
+                                                       } else {
+                                                               try {
+                                                                       
synchronized(this) {
+                                                                               
agentLogger.info("cleanupOnStartup", null,
+                                                                               
                "--- CGroup:" + cgroupFolder+ " procs file still showing 
processes running. Wait until CGroups updates acccounting");
+                                                                               
wait(200);
+                                                                               
+                                                                       }
+                                                               } catch( 
InterruptedException ee) {}
+                                                       }
+                                               }
+                                       }
+                                       
+                                       destroyContainer(cgroupFolder);
+                                       agentLogger.info("cleanupOnStartup", 
null,
+                                                       "--- Agent Removed 
Empty CGroup:" + cgroupFolder);
+                               } catch (Exception e) {
+                                       agentLogger.error("cleanupOnStartup", 
null, e);
+                               }
+                       }
+               }
+       }
+
+       public void kill(final String user, final String pid) {
+               final String methodName = "kill";
+
+               try {
+                       String c_launcher_path = 
Utils.resolvePlaceholderIfExists(
+                                       
System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
+                                       System.getProperties());
+                       String cmdLine;
+                       String arg;
+                       boolean useDuccling = false;
+                       if (Utils.isWindows()) {
+                               cmdLine = "taskkill";
+                               arg = "/PID";
+                       } else {
+                               String useSpawn = System
+                                               
.getProperty("ducc.agent.launcher.use.ducc_spawn");
+                               if (useSpawn != null && 
useSpawn.toLowerCase().equals("true")) {
+                                       useDuccling = true;
+                               }
+                               cmdLine = "/bin/kill";
+                               arg = "-9";
+                       }
+                       String[] duccling_nolog;
+                       if (useDuccling) {
+                               duccling_nolog = new String[] { 
c_launcher_path, "-u", user,
+                                               "--", cmdLine, arg, pid };
+                       } else {
+                               duccling_nolog = new String[] { cmdLine, arg, 
pid };
+                       }
+
+                       // if (kill != null && Boolean.parseBoolean(kill) == 
true) {
+                       ProcessBuilder pb = new ProcessBuilder(duccling_nolog);
+                       pb.redirectErrorStream(true);
+                       java.lang.Process killedProcess = pb.start();
+                       InputStream is = killedProcess.getInputStream();
+                       BufferedReader reader = new BufferedReader(
+                                       new InputStreamReader(is));
+                       // String line = null;
+                       // read the next line from kill command
+                       while (reader.readLine() != null) {
+                               // dont care about the output, just drain the 
buffers
+                       }
+                       is.close();
+                       StringBuffer sb = new StringBuffer();
+                       for (String part : duccling_nolog) {
+                               sb.append(part).append(" ");
+                       }
+                       if (agentLogger == null) {
+                               System.out.println("--------- Killed Process:" 
+ pid
+                                               + " Owned by:" + user + " 
Command:" + sb.toString());
+
+                       } else {
+                               agentLogger.info(methodName, null,
+                                               "--------- Killed CGroup 
Process:" + pid + " Owned by:" + user
+                                                               + " Command:" + 
sb.toString());
+
+                       }
+
+               } catch (Exception e) {
+                       agentLogger.error(methodName, null,e );
+               }
+       }
+
+       /**
+        * Creates cgroup container with a given id and owner.
+        * 
+        * @param containerId
+        *            - new cgroup container id
+        * @param userId
+        *            - owner of the cgroup container
+        * @param useDuccSpawn
+        *            - use duccling to run 'cgcreate' command
+        * 
+        * @return - true on success, false otherwise
+        * 
+        * @throws Exception
+        */
+       public boolean createContainer(String containerId, String userId,
+                       boolean useDuccSpawn) throws Exception {
+
                try {
-                       String [] command = new String[] 
{"/usr/bin/cgcreate","-g", cgroupSubsystems+":ducc/"+containerId};
-                       int retCode = launchCommand(command, useDuccSpawn, 
userId, containerId);
-                       if ( retCode == 0 ) {
+                       String[] command = new String[] { "/usr/bin/cgcreate", 
"-t",
+                                       "ducc", "-a", "ducc", "-g",
+                                       cgroupSubsystems + ":ducc/" + 
containerId };
+                       int retCode = launchCommand(command, useDuccSpawn, 
"ducc",
+                                       containerId);
+                       if (retCode == 0) {
                                containerIds.add(containerId);
+                               agentLogger.info("createContainer", null, ">>>>"
+                                               + "SUCCESS - Created CGroup 
Container:" + containerId);
+
                                return true;
                        } else {
+                               agentLogger.info("createContainer", null, ">>>>"
+                                               + "FAILURE - Unable To Create 
CGroup Container:"
+                                               + containerId);
+
                                return false;
                        }
-               } catch ( Exception e ) {
+               } catch (Exception e) {
+                       agentLogger.error("createContainer", null, ">>>>"
+                                       + "FAILURE - Unable To Create CGroup 
Container:"
+                                       + containerId, e);
+
                        return false;
                }
        }
+
        /**
-        * Sets the max memory use for an existing cgroup container. 
+        * Sets the max memory use for an existing cgroup container.
         * 
-        * @param containerId - existing container id for which limit will be 
set
-        * @param userId - container owner
-        * @param useDuccSpawn - run 'cgset' command as a user
-        * @param containerMaxSize - max memory limit 
+        * @param containerId
+        *            - existing container id for which limit will be set
+        * @param userId
+        *            - container owner
+        * @param useDuccSpawn
+        *            - run 'cgset' command as a user
+        * @param containerMaxSize
+        *            - max memory limit
         * 
         * @return - true on success, false otherwise
         * 
         * @throws Exception
         */
-       public boolean setContainerMaxMemoryLimit( String containerId, String 
userId, boolean useDuccSpawn, long containerMaxSize) throws Exception {
+       public boolean setContainerMaxMemoryLimit(String containerId,
+                       String userId, boolean useDuccSpawn, long 
containerMaxSize)
+                       throws Exception {
                try {
-                       String [] command = new String[] 
{"/usr/bin/cgset","-r", "memory.limit_in_bytes="+containerMaxSize, 
"ducc/"+containerId};
-                       int retCode = launchCommand(command, useDuccSpawn, 
userId, containerId);
-                       return retCode == 0 ? true : false;
-               } catch ( Exception e ) {
+                       String[] command = new String[] { "/usr/bin/cgset", 
"-r",
+                                       "memory.limit_in_bytes=" + 
containerMaxSize,
+                                       "ducc/" + containerId };
+                       int retCode = launchCommand(command, useDuccSpawn, 
"ducc",
+                                       containerId);
+                       if (retCode == 0) {
+                               agentLogger.info("setContainerMaxMemoryLimit", 
null, ">>>>"
+                                               + "SUCCESS - Created CGroup 
Limit on Container:"
+                                               + containerId);
+                               return true;
+                       } else {
+                               agentLogger.info("setContainerMaxMemoryLimit", 
null, ">>>>"
+                                               + "FAILURE - Unable To Create 
CGroup Container:"
+                                               + containerId);
+                               return false;
+                       }
+               } catch (Exception e) {
+                       agentLogger.error("setContainerMaxMemoryLimit", null, 
">>>>"
+                                       + "FAILURE - Unable To Set Limit On 
CGroup Container:"
+                                       + containerId, e);
                        return false;
                }
        }
-               
+
        /**
-        * Removes cgroup container with a given id. Cgroups are implemented as
-        * a virtual file system. All is needed here is just rmdir. 
+        * Removes cgroup container with a given id. Cgroups are implemented as 
a
+        * virtual file system. All is needed here is just rmdir.
         * 
-        * @param containerId - cgroup to remove
+        * @param containerId
+        *            - cgroup to remove
         * @return - true on success, false otherwise
         * 
         * @throws Exception
         */
        public boolean destroyContainer(String containerId) throws Exception {
                try {
-                       if ( cgroupExists(cgroupBaseDir+"/"+containerId)) {
-                               String [] command = new String[] {"/bin/rmdir", 
cgroupBaseDir+"/"+containerId};
+                       if (cgroupExists(cgroupBaseDir + "/" + containerId)) {
+                               String[] command = new String[] { "/bin/rmdir",
+                                               cgroupBaseDir + "/" + 
containerId };
                                int retCode = launchCommand(command, false, 
"ducc", containerId);
-                               if ( retCode == 0 ) {
+                               if (retCode == 0) {
                                        containerIds.remove(containerId);
                                        return true;
                                } else {
@@ -123,33 +327,34 @@ public class CGroupsManager {
                                }
                        }
                        return true; // nothing to do, cgroup does not exist
-               } catch ( Exception e ) {
+               } catch (Exception e) {
                        return false;
                }
        }
-       
-       private int launchCommand(String[] command, boolean useDuccSpawn, 
String userId, String containerId) throws Exception {
+
+       private int launchCommand(String[] command, boolean useDuccSpawn,
+                       String userId, String containerId) throws Exception {
                String[] commandLine = null;
                try {
-                       //                                                      
-                       //      Use ducc_ling (c code) as a launcher for the 
actual process. The ducc_ling
-                       //  allows the process to run as a specified user in 
order to write out logs in
-                       //  user's space as oppose to ducc space.
-                       String c_launcher_path = 
-                                       Utils.resolvePlaceholderIfExists(
-                                                       
System.getProperty("ducc.agent.launcher.ducc_spawn_path"),System.getProperties());
-
-                       
+                       //
+                       // Use ducc_ling (c code) as a launcher for the actual 
process. The
+                       // ducc_ling
+                       // allows the process to run as a specified user in 
order to write
+                       // out logs in
+                       // user's space as oppose to ducc space.
+                       String c_launcher_path = 
Utils.resolvePlaceholderIfExists(
+                                       
System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
+                                       System.getProperties());
 
-                       if ( useDuccSpawn && c_launcher_path != null ) {
-                               commandLine = new String[4+command.length];
+                       if (useDuccSpawn && c_launcher_path != null) {
+                               commandLine = new String[4 + command.length];
                                commandLine[0] = c_launcher_path;
                                commandLine[1] = "-u";
                                commandLine[2] = userId;
                                commandLine[3] = "--";
-                               
-                               int j=0;
-                               for(int i=4; i < commandLine.length;i++) {
+
+                               int j = 0;
+                               for (int i = 4; i < commandLine.length; i++) {
                                        commandLine[i] = command[j++];
                                }
                        } else {
@@ -158,38 +363,43 @@ public class CGroupsManager {
                        ProcessBuilder processLauncher = new ProcessBuilder();
                        processLauncher.command(commandLine);
                        processLauncher.redirectErrorStream();
-                       
+
                        java.lang.Process process = processLauncher.start();
-                       
-                       InputStreamReader in = new 
InputStreamReader(process.getInputStream());
+
+                       InputStreamReader in = new InputStreamReader(
+                                       process.getInputStream());
                        BufferedReader reader = new BufferedReader(in);
                        String line;
                        while ((line = reader.readLine()) != null) {
-                               System.out.println(">>>>"+line);
+                               agentLogger.info("launchCommand", null, ">>>>" 
+ line);
                        }
                        int retCode = process.waitFor();
                        return retCode;
-                       
-               } catch( Exception e) {
+
+               } catch (Exception e) {
                        StringBuffer sb = new StringBuffer();
-                       if ( commandLine != null ) {
-               for ( String cmdPart : commandLine ) {
-                         sb.append(cmdPart).append(" ");         
-                      }
-                       }
-           if ( agentLogger != null ) {
-                  agentLogger.error("launchCommand", null, "Unable to Launch 
Command:"+sb.toString(),e);
-           } else {
-                  System.out.println("CGroupsManager.launchCommand()- Unable 
to Launch Command:"+sb.toString());
-                          e.printStackTrace();
-           }
+                       if (commandLine != null) {
+                               for (String cmdPart : commandLine) {
+                                       sb.append(cmdPart).append(" ");
+                               }
+                       }
+                       if (agentLogger != null) {
+                               agentLogger.error("launchCommand", null,
+                                               "Unable to Launch Command:" + 
sb.toString(), e);
+                       } else {
+                               System.out
+                                               
.println("CGroupsManager.launchCommand()- Unable to Launch Command:"
+                                                               + 
sb.toString());
+                               e.printStackTrace();
+                       }
 
-               } 
-               return -1;  // failure
+               }
+               return -1; // failure
        }
+
        /**
-        * Return a Set of existing cgroup Ids found in the filesystem 
identified
-        * by 'cgroupBaseDir'.
+        * Return a Set of existing cgroup Ids found in the filesystem 
identified by
+        * 'cgroupBaseDir'.
         * 
         * @return - set of cgroup ids
         * 
@@ -197,24 +407,96 @@ public class CGroupsManager {
         */
        public Set<String> collectExistingContainers() throws Exception {
                File duccCGroupBaseDir = new File(cgroupBaseDir);
-               if ( duccCGroupBaseDir.exists()) {
+               if (duccCGroupBaseDir.exists()) {
                        File[] existingCGroups = duccCGroupBaseDir.listFiles();
-                       for (File cgroup : existingCGroups ) {
-                               if ( cgroup.isDirectory() ) {
+                       for (File cgroup : existingCGroups) {
+                               if (cgroup.isDirectory()) {
                                        containerIds.add(cgroup.getName());
                                }
                        }
-               } 
+               }
                return containerIds;
        }
+
        public String getDuccCGroupBaseDir() {
                return cgroupBaseDir;
        }
+
        public String getSubsystems() {
                return cgroupSubsystems;
        }
+
        public boolean cgroupExists(String cgroup) throws Exception {
                File duccCGroupBaseDir = new File(cgroup);
                return duccCGroupBaseDir.exists();
        }
+
+       public Set<NodeProcessInfo> getProcessesOnNode() throws Exception {
+               String location = "getProcessesOnNode";
+               Set<NodeProcessInfo> processList = new 
HashSet<NodeProcessInfo>();
+               try {
+
+                       ProcessBuilder pb = new ProcessBuilder("ps", "-Ao",
+                                       "user:12,pid,ppid,args", 
"--no-heading");
+                       pb.redirectErrorStream(true);
+                       java.lang.Process proc = pb.start();
+                       // spawn ps command and scrape the output
+                       InputStream stream = proc.getInputStream();
+                       BufferedReader reader = new BufferedReader(new 
InputStreamReader(
+                                       stream));
+                       String line;
+                       String regex = "\\s+";
+
+                       // read the next line from ps output
+                       while ((line = reader.readLine()) != null) {
+
+                               String tokens[] = line.split(regex);
+                               String user = tokens[0];
+                               String pid = tokens[1];
+                               String ppid = tokens[2];
+
+                               if (tokens.length > 0) {
+
+                                       processList.add(new 
NodeProcessInfo(pid, ppid, user));
+                               }
+                       }
+               } catch (Exception e) {
+                       if (agentLogger == null) {
+                               e.printStackTrace();
+                       } else {
+                               agentLogger.error(location, null, e);
+                       }
+               }
+               return processList;
+
+       }
+
+       public class NodeProcessInfo {
+               private String pid;
+               private String ppid;
+               private String userid;
+
+               NodeProcessInfo(String pid, String ppid, String uid) {
+                       this.pid = pid;
+                       this.ppid = ppid;
+                       userid = uid;
+               }
+
+               public String getPid() {
+                       return pid;
+               }
+
+               public String getPpid() {
+                       return ppid;
+               }
+
+               public String getUserid() {
+                       return userid;
+               }
+
+               public void setUserid(String userid) {
+                       this.userid = userid;
+               }
+
+       }
 }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java?rev=1469537&r1=1469536&r2=1469537&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
 Thu Apr 18 19:41:46 2013
@@ -29,6 +29,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.management.RuntimeErrorException;
+
 import org.apache.uima.ducc.agent.NodeAgent;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.TimeStamp;
@@ -143,18 +145,24 @@ public class DuccCommandExecutor extends
                                                String containerId = 
getContainerId();
                                                logger.info(methodName, null, 
"Creating CGroup with ID:"+containerId);                                  
                                                if ( 
!agent.cgroupsManager.cgroupExists(agent.cgroupsManager.getDuccCGroupBaseDir()+"/"+containerId)
 ) {
-                                                       
+                                                       boolean failed = false;
                                                        // create cgroup 
container for JDs
                                                        try {
                                                                if ( 
createCGroupContainer(duccProcess, containerId, 
((ManagedProcess)super.managedProcess).getOwner()) ) {
                                                                        
logger.info(methodName, null, "Created CGroup with ID:"+containerId+" With 
Memory 
Limit="+((ManagedProcess)super.managedProcess).getDuccProcess().getCGroup().getMaxMemoryLimit()+"
 Bytes");
                                                                } else {
                                                                        
logger.info(methodName, null, "Failed To Create CGroup with ID:"+containerId);
+                                                                       
duccProcess.setProcessState(ProcessState.Failed);
+                                                                       
duccProcess.setReasonForStoppingProcess("CGroupsPermissionDenied");
+                                                                       failed 
= true;
                                                                }
                                                        } catch( Exception e) {
                                                                
logger.error(methodName, null, e);
                                                                
                                                        }
+                                                       if ( failed ) {
+                                                               throw new 
RuntimeException("The Agent is Unable To Create A CGroup with Container ID: 
"+containerId+". Rejecting Deployment of Process with 
ID:"+duccProcess.getDuccId());
+                                                       }
                                                } else {
                                                        logger.info(methodName, 
null, "CGroup Exists with ID:"+containerId);                                    
 

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java?rev=1469537&r1=1469536&r2=1469537&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
 Thu Apr 18 19:41:46 2013
@@ -55,8 +55,8 @@ implements ProcessMetricsProcessor {
        private DuccLogger logger;
        private ManagedProcess managedProcess;
        private NodeAgent agent;
-  private int fudgeFactor = 5;  // default is 5%
-       private int logCounter=0;
+    private int fudgeFactor = 5;  // default is 5%
+       //private int logCounter=0;
        public LinuxProcessMetricsProcessor(DuccLogger logger, IDuccProcess 
process, NodeAgent agent,String statmFilePath, String nodeStatFilePath, String 
processStatFilePath, ManagedProcess managedProcess) throws 
FileNotFoundException{
                this.logger = logger;
                statmFile = new RandomAccessFile(statmFilePath, "r");
@@ -64,7 +64,7 @@ implements ProcessMetricsProcessor {
                processStatFile = new RandomAccessFile(processStatFilePath, 
"r");
                this.managedProcess = managedProcess;
                this.agent = agent;
-               pool = Executors.newFixedThreadPool(3);
+               pool = Executors.newFixedThreadPool(30);
                this.process = process;
     gcStatsCollector = new DuccGarbageStatsCollector(logger, process);
                //      read the block size from ducc.properties
@@ -113,8 +113,9 @@ implements ProcessMetricsProcessor {
                        String DUCC_HOME = Utils.findDuccHome();
                        //      executes script 
DUCC_HOME/admin/ducc_get_process_swap_usage.sh which sums up swap used by a 
process
                        DuccProcessSwapSpaceUsage processSwapSpaceUsage = 
-                                       new 
DuccProcessSwapSpaceUsage(process.getPID(),DUCC_HOME+"/admin/ducc_get_process_swap_usage.sh",
 logger);
-                       
+                                       new 
DuccProcessSwapSpaceUsage(process.getPID(), managedProcess.getOwner(), 
DUCC_HOME+"/admin/ducc_get_process_swap_usage.sh", logger);
+
+
                        logger.trace("process", null, "----------- 
PID:"+process.getPID()+" Cumulative CPU Time 
(jiffies):"+processCpuUsage.get().getTotalJiffies()); 
                        //      Publish cumulative CPU usage
                        
process.setCpuTime(processCpuUsage.get().getTotalJiffies());
@@ -122,16 +123,17 @@ implements ProcessMetricsProcessor {
                        // collects process Major faults (swap in memory)
                        process.setMajorFaults(majorFaults);
                        //      Current Process Swap Usage in bytes
+                       long st = System.currentTimeMillis();
                        long processSwapUsage = 
processSwapSpaceUsage.getSwapUsage()*1024;
                        //      collects swap usage from /proc/<PID>/smaps file 
via a script DUCC_HOME/admin/collect_process_swap_usage.sh
                        process.setSwapUsage(processSwapUsage);
-                       if ( (logCounter % 100 ) == 0 ) {
-                          logger.info("process", null, "----------- 
PID:"+process.getPID()+" Major Faults:"+majorFaults+" Process Swap 
Usage:"+processSwapUsage); 
-                       }
-                       logCounter++;
+               //      if ( (logCounter % 2 ) == 0 ) {
+                          logger.info("process", null, "----------- 
PID:"+process.getPID()+" Major Faults:"+majorFaults+" Process Swap 
Usage:"+processSwapUsage+" Max Swap Usage 
Allowed:"+managedProcess.getMaxSwapThreshold()+" Time to Collect Swap Usage:"+ 
(System.currentTimeMillis()-st)); 
+                       //}
+                       //logCounter++;
                        
                        if (processSwapUsage > 0 && processSwapUsage > 
managedProcess.getMaxSwapThreshold()) {
-                               logger.error("process", null, 
"\n\n********************************************************\n\tProcess with 
PID:"+managedProcess.getPid()+ " Exceeded its max swap usage assignment  of "+ 
managedProcess.getMaxSwapThreshold()+" MBs. This Process Swap Usage is: 
"+processSwapUsage+" MBs .Killing process 
...\n********************************************************\n\n" );
+                               logger.error("process", null, 
"\n\n********************************************************\n\tProcess with 
PID:"+managedProcess.getPid()+ " Exceeded its Max Swap Usage Threshold of "+ 
(managedProcess.getMaxSwapThreshold()/1024)/1024+" MBs. The Current Swap Usage 
is: "+(processSwapUsage/1024)/1024+" MBs .Killing process 
...\n********************************************************\n\n" );
                                try {
                                        managedProcess.kill();  // mark it for 
death
                                        
process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededSwapThreshold.toString());

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java?rev=1469537&r1=1469536&r2=1469537&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java
 Thu Apr 18 19:41:46 2013
@@ -5,7 +5,7 @@ import org.apache.uima.ducc.common.node.
 public class DuccProcessMemoryPageLoadUsage extends ByteBufferParser implements
                ProcessMemoryPageLoadUsage {
        private static final long serialVersionUID = 1L;
-       public static final int MAJORFAULTSFLD=12;
+       public static final int MAJORFAULTSFLD=11;
        
        public DuccProcessMemoryPageLoadUsage(byte[] memInfoBuffer,
                        int[] memInfoFieldOffsets, int[] memInfoFiledLengths) {

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java?rev=1469537&r1=1469536&r2=1469537&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java
 Thu Apr 18 19:41:46 2013
@@ -3,17 +3,25 @@ package org.apache.uima.ducc.common.agen
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 
+
 import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.Utils;
 
 public class DuccProcessSwapSpaceUsage implements ProcessSwapSpaceUsage {
        String pid=null;
        String execScript=null;
        DuccLogger logger=null;
+       String[] command;
        
-       public DuccProcessSwapSpaceUsage( String pid, String execScript, 
DuccLogger logger) {
+       public DuccProcessSwapSpaceUsage( String pid, String owner, String 
execScript, DuccLogger logger) {
                this.pid = pid;
                this.execScript = execScript;
                this.logger = logger;
+           String c_launcher_path = 
+                   Utils.resolvePlaceholderIfExists(
+                           
System.getProperty("ducc.agent.launcher.ducc_spawn_path"),System.getProperties());
+           command = new String[] { c_launcher_path,
+                     "-u", owner, "--", execScript, pid }; 
        }
        public long getSwapUsage() {
                long swapusage=0;
@@ -21,17 +29,29 @@ public class DuccProcessSwapSpaceUsage i
                        InputStreamReader in = null;
                        try {
                                ProcessBuilder pb = new ProcessBuilder();
-                               String[] command = {execScript,pid};
-                               pb.command(command);
+                               //String[] command = {execScript,pid};
+                               pb.command(command); //command);
+                               String cmd = "";
+                               for( String c : command) {
+                                       cmd += " "+ c;
+                               }
+                               //logger.info("------------ getSwapUsage-", 
null, cmd);
                                pb.redirectErrorStream(true);
                                Process swapCollectorProcess = pb.start();
                                in = new 
InputStreamReader(swapCollectorProcess.getInputStream());
                                BufferedReader reader = new BufferedReader(in);
                                String line=null;
-                               
-                               while ((line = reader.readLine()) != null && 
line.trim().length() > 0 ) {
+                               boolean skip = true;
+                               while ((line = reader.readLine()) != null) {
                                        try {
-                                               swapusage = 
Long.parseLong(line.trim());
+                                               if ( line.startsWith("1001")) {
+                                                       skip = false;
+                                                       continue;
+                                               }
+                                               if (!skip) {
+                                                       swapusage = 
Long.parseLong(line.trim());
+                                                       
logger.info("getSwapUsage-",null, "PID:"+pid+" Swap Usage:"+line);
+                                               }
                                        } catch( NumberFormatException e) {
                                                logger.error("getSwapUsage", 
null, line);
                                        }


Reply via email to