[ 
https://issues.apache.org/jira/browse/CHUKWA-17?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng updated CHUKWA-17:
------------------------

    Status: Patch Available  (was: Open)

Index: src/contrib/chukwa/bin/exec-data-loader.sh
===================================================================
--- src/contrib/chukwa/bin/exec-data-loader.sh  (revision 751021)
+++ src/contrib/chukwa/bin/exec-data-loader.sh  (working copy)
@@ -35,6 +35,9 @@
   if [ "X$PARM" == "Xtop" ]; then
     kill -9 `cat ${CHUKWA_PID_DIR}/Top-data-loader.pid`
   fi
+  if [ "X$PARM" == "Xps" ]; then
+    kill -9 `cat ${CHUKWA_PID_DIR}/Ps-data-loader.pid`
+  fi
   if [ "X$PARM" == "Xdf" ]; then
     kill -9 `cat ${CHUKWA_PID_DIR}/Df-data-loader.pid`
   fi
@@ -71,6 +74,10 @@
   ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} 
-DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} 
-DRECORD_TYPE=Top -Dlog4j.configuration=system-data-loader.properties 
-classpath 
${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} 
org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec top -b -n 1 -c &
 fi
 
+if [ "X$PARM" == "Xps" ]; then
+  ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} 
-DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} 
-DRECORD_TYPE=Ps -Dlog4j.configuration=system-data-loader.properties -classpath 
${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} 
org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec ps axo 
pid,user,vsize,size,pcpu,pmem,time,start,cmd &
+fi
+
 if [ "X$PARM" == "Xdf" ]; then
   ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} 
-DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} 
-DRECORD_TYPE=Df -Dlog4j.configuration=system-data-loader.properties -classpath 
${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} 
org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec df -l &
 fi
Index: src/contrib/chukwa/bin/systemDataLoader.sh
===================================================================
--- src/contrib/chukwa/bin/systemDataLoader.sh  (revision 751023)
+++ src/contrib/chukwa/bin/systemDataLoader.sh  (working copy)
@@ -78,6 +78,23 @@
   fi
 
   EXISTS=0
+  pidFile="${CHUKWA_PID_DIR}/Ps-data-loader.pid"
+  if [ -f $pidFile ]; then
+    pid=`head ${pidFile}`
+    ChildPIDRunningStatus=`${JPS} | grep ${pid} | grep Exec | grep -v grep | 
wc -l`
+    if [ $ChildPIDRunningStatus -ge 1 ]; then
+      EXISTS=1
+    fi
+  fi
+
+  if [ ${EXISTS} -lt 1 ]; then
+    echo "ps data loader is stopped."
+    RESULT=1
+  else
+    echo "ps data loader is running."
+  fi
+
+  EXISTS=0
   pidFile="${CHUKWA_PID_DIR}/Df-data-loader.pid"
   if [ -f $pidFile ]; then
     pid=`head ${pidFile}`
@@ -125,6 +142,9 @@
   if [ -f ${CHUKWA_PID_DIR}/Top-data-loader.pid ]; then
     kill -9 `cat ${CHUKWA_PID_DIR}/Top-data-loader.pid`
   fi
+  if [ -f ${CHUKWA_PID_DIR}/Ps-data-loader.pid ]; then
+    kill -9 `cat ${CHUKWA_PID_DIR}/Ps-data-loader.pid`
+  fi
   if [ -f ${CHUKWA_PID_DIR}/Df-data-loader.pid ]; then
     kill -9 `cat ${CHUKWA_PID_DIR}/Df-data-loader.pid`
   fi
@@ -200,6 +220,20 @@
 fi
 
 EXISTS=0
+pidFile="${CHUKWA_PID_DIR}/Ps-data-loader.pid"
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`${JPS} | grep ${pid} | grep Exec | grep -v grep | wc 
-l`
+  if [ $ChildPIDRunningStatus -ge 1 ]; then
+    EXISTS=1
+  fi
+fi
+
+if [ ${EXISTS} -lt 1 ]; then
+  ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} 
-DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} 
-DRECORD_TYPE=Ps -Dlog4j.configuration=system-data-loader.properties -classpath 
${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} 
org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec ps axo 
pid,user,vsize,size,pcpu,pmem,time,start,cmd &
+fi
+
+EXISTS=0
 pidFile="${CHUKWA_PID_DIR}/Df-data-loader.pid"
 if [ -f $pidFile ]; then
   pid=`head ${pidFile}`
Index: src/contrib/chukwa/conf/chukwa-demux-conf.xml
===================================================================
--- src/contrib/chukwa/conf/chukwa-demux-conf.xml       (revision 751024)
+++ src/contrib/chukwa/conf/chukwa-demux-conf.xml       (working copy)
@@ -81,6 +81,12 @@
    </property>
 
    <property>
+    <name>Ps</name>
+    
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Ps</value>
+    <description>Parser class for </description>
+   </property>
+
+   <property>
     <name>Torque</name>
     
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Torque</value>
     <description>Parser class for Parsing qstat and tracejob</description>
Index: 
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
===================================================================
--- 
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
        (revision 0)
+++ 
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
        (revision 0)
@@ -0,0 +1,47 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class LogEntry {
+       private final static SimpleDateFormat sdf = new SimpleDateFormat(
+                       "yyyy-MM-dd HH:mm");
+
+       private Date date;
+       private String logLevel;
+       private String className;
+       private String body;
+
+       public LogEntry(String recordEntry) throws ParseException {
+               String dStr = recordEntry.substring(0, 23);
+               date = sdf.parse(dStr);
+               int start = 24;
+               int idx = recordEntry.indexOf(' ', start);
+               logLevel = recordEntry.substring(start, idx);
+               start = idx + 1;
+               idx = recordEntry.indexOf(' ', start);
+               className = recordEntry.substring(start, idx - 1);
+               body = recordEntry.substring(idx + 1);
+       }
+
+       public Date getDate() {
+               return date;
+       }
+
+       public void setDate(Date date) {
+               this.date = date;
+       }
+
+       public String getLogLevel() {
+               return logLevel;
+       }
+
+       public String getClassName() {
+               return className;
+       }
+
+       public String getBody() {
+               return body;
+       }
+}
Index: 
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
===================================================================
--- 
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
      (revision 0)
+++ 
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
      (revision 0)
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class Ps extends AbstractProcessor {
+       static Logger log = Logger.getLogger(Ps.class);
+
+       @Override
+       protected void parse(String recordEntry,
+                       OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+                       Reporter reporter) throws Throwable {
+               LogEntry log = new LogEntry(recordEntry);
+               PsOutput ps = new PsOutput(log.getBody());
+               for (HashMap<String, String> processInfo : ps.getProcessList()) 
{
+                       key = new ChukwaRecordKey();
+                       ChukwaRecord record = new ChukwaRecord();
+                       this.buildGenericRecord(record, null, 
log.getDate().getTime(), "Ps");
+                       for (Entry<String, String> entry : 
processInfo.entrySet()) {
+                               record.add(entry.getKey(), entry.getValue());
+                       }
+                       output.collect(key, record);
+               }
+       }
+
+       public static class PsOutput {
+
+               // processes info
+               private ArrayList<HashMap<String, String>> recordList = new 
ArrayList<HashMap<String, String>>();
+
+               public PsOutput(String psCmdOutput) throws InvalidPsRecord {
+                       if (psCmdOutput == null || psCmdOutput.length() == 0)
+                               return;
+
+                       String[] lines = psCmdOutput.split("[\n\r]+");
+
+                       // at least two lines
+                       if (lines.length < 2)
+                               return;
+
+                       // header
+                       ArrayList<String> header = new ArrayList<String>();
+                       Matcher matcher = Pattern.compile("[^ 
^\t]+").matcher(lines[0]);
+                       while (matcher.find()) {
+                               header.add(matcher.group(0));
+                       }
+                       if (!header.get(header.size() - 1).equals("CMD")) {
+                               throw new InvalidPsRecord("CMD must be the last 
column");
+                       }
+
+                       // records
+                       boolean foundInitCmd = false;
+                       for (int line = 1; line < lines.length; line++) {
+                               HashMap<String, String> record = new 
HashMap<String, String>();
+                               recordList.add(record);
+
+                               matcher = Pattern.compile("[^ 
^\t]+").matcher(lines[line]);
+                               for(int index = 0; index<header.size(); 
index++) {
+                                       String key = header.get(index);
+                                       matcher.find();
+                                       if (!key.equals("CMD")) {
+                                               String value = matcher.group(0);
+                                               /**
+                                                * For STARTED column, it could 
be in two formats: "MMM dd" or "hh:mm:ss".
+                                                * If we use ' ' as the 
delimiter, we must read twice to the date if it's
+                                                * with "MMM dd" format.
+                                                */
+                                               if (key.equals("STARTED")) {
+                                                       char c = 
value.charAt(0);
+                                                       if ( c < '0' || c > 
'9') {
+                                                               matcher.find();
+                                                               value += 
matcher.group(0);
+                                                       }
+                                               }
+                                               record.put(key, value);
+                                       } else {
+                                               // reached the cmd part. all 
remains should be put
+                                               // together as the command
+                                               String value = 
lines[line].substring(matcher.start());
+                                               record.put(key, value);
+                                               if(!foundInitCmd)
+                                                       foundInitCmd = 
value.startsWith("init");
+                                               break;
+                                       }
+                               }
+                       }
+                       if(!foundInitCmd)
+                               throw new InvalidPsRecord("Did not find 'init' 
cmd");
+               }
+
+               public ArrayList<HashMap<String, String>> getProcessList() {
+                       return recordList;
+               }
+       }
+
+       public static class InvalidPsRecord extends Exception {
+               private static final long serialVersionUID = 1L;
+
+               public InvalidPsRecord() {
+               }
+
+               public InvalidPsRecord(String arg0) {
+                       super(arg0);
+               }
+
+               public InvalidPsRecord(Throwable arg0) {
+                       super(arg0);
+               }
+
+               public InvalidPsRecord(String arg0, Throwable arg1) {
+                       super(arg0, arg1);
+               }
+       }
+}
Index: 
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java
===================================================================
--- 
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java
     (revision 749503)
+++ 
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java
     (working copy)
@@ -85,20 +85,6 @@
                                this.buildGenericRecord(record, null, 
d.getTime(), "SystemMetrics");
                                output.collect(key, record);
                                
-                               StringBuffer buffer = new StringBuffer();
-                               //FIXME please validate this
-                               while (i < lines.length) {
-                                       record = null;
-                                       buffer.append(lines[i]+"\n");
-                                       i++;
-                                       
-                               }
-                               record = new ChukwaRecord();
-                               key = new ChukwaRecordKey();
-                               this.buildGenericRecord(record, 
buffer.toString(), d.getTime(), "Top");
-                               //Output Top info to database
-                               output.collect(key, record);
-
                                // End of parsing
                        } catch (Exception e)
                        {
Index: 
src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PsOutputTest.java
===================================================================
--- 
src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PsOutputTest.java
    (revision 0)
+++ 
src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PsOutputTest.java
    (revision 0)
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ps;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import 
org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Ps.InvalidPsRecord;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Ps.PsOutput;
+
+import junit.framework.TestCase;
+
+public class PsOutputTest extends TestCase {
+
+       public void testGetRecordListFromPsCmd() throws IOException, 
InvalidPsRecord {
+               Runtime runtime = Runtime.getRuntime();
+               Process process = runtime.exec("ps axo 
pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd");
+               StringBuffer sb = new StringBuffer();
+               InputStream is = process.getInputStream();
+               byte[] buffer = new byte[1024];
+               while(true) {
+                       int len = is.read(buffer);
+                       if(len == -1)
+                               break;
+                       sb.append(new String(buffer, 0, len));
+               }
+               String output = sb.toString();
+               
+               PsOutput pso = new PsOutput(output);
+               
+               // search init process
+               for(HashMap<String, String> processInfo : pso.getProcessList()) 
{
+                       for(Entry<String, String> entry : 
processInfo.entrySet()) {
+                               if(entry.getKey().equals("CMD") && 
entry.getValue().startsWith("init")) {
+                                       return;
+                               }
+                       }
+               }
+               
+               throw new InvalidPsRecord(output);
+       }
+
+       public void testGetRecordList() throws IOException, InvalidPsRecord {
+               // below is from command "ps axo 
pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd"
+               String output = 
+                       "  PID USER        VSZ    SZ %CPU %MEM     TIME START  
STARTED CMD\n" +
+                       "    1 root       2064   284  0.0  0.0 00:00:02  2008   
Dec 29 init [5]\n" +
+                       "    2 root          0     0  0.0  0.0 00:00:01  2008   
Dec 29 [migration/0]\n" +
+                       "20270 chzhang    4248   588  0.0  0.0 00:00:00 15:32 
15:32:36 ps axo pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd\n" +
+                       "28371 angelac2   7100  1716  0.0  0.0 00:00:00 Feb27   
Feb 27 /usr/libexec/gconfd-2 5\n";
+               
+               PsOutput pso = new PsOutput(output);
+               ArrayList<HashMap<String, String>> processes = 
pso.getProcessList();
+               assertEquals(4, processes.size());
+               assertEquals("Dec29", processes.get(0).get("STARTED"));
+               assertEquals("15:32:36", processes.get(2).get("STARTED"));
+               assertEquals("ps axo 
pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd", 
processes.get(2).get("CMD"));
+       }
+
+}
Index: src/contrib/chukwa/tools/service/chukwa-ps/run
===================================================================
--- src/contrib/chukwa/tools/service/chukwa-ps/run      (revision 0)
+++ src/contrib/chukwa/tools/service/chukwa-ps/run      (revision 0)
@@ -0,0 +1,5 @@
+#!/bin/sh
+CHUKWA_CONF_DIR=/usr/local/chukwa/conf
+
+exec setuidgid gmon /usr/local/chukwa/bin/exec-data-loader.sh --config 
$CHUKWA_CONF_DIR ps
+


> Enhance process information collection 
> ---------------------------------------
>
>                 Key: CHUKWA-17
>                 URL: https://issues.apache.org/jira/browse/CHUKWA-17
>             Project: Hadoop Chukwa
>          Issue Type: New Feature
>         Environment: Redhat EL 5.1, Java 6
>            Reporter: Cheng
>            Priority: Minor
>
> We are currently getting the process information from "top". And a given 
> process looks like,
> 28288 gmon      18   0 2225m 277m 8876 S    0  7.0   1:38.56 java
> However, that's still not complete. For it to be truly useful, we would want 
> something like,
> gmon     28288 28244  0 Nov13 ?        00:01:38 /grid/0/java/jdk/bin/java 
> -Xms1000M -Xmx2000M -DAPP=collector
> -Dlog4j.configuration=chukwa-log4j.properties -DCHUKWA_HOME=/gri........
> -DCHUKWA_CONF_DIR=/gri......./../conf 
> -DCHUKWA_LOG_DIR=/grid.........bin/../var/log -classpath
> :/grid..........
> We can get those information by using command below,
>         ps axo pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to