[
https://issues.apache.org/jira/browse/CHUKWA-17?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Cheng updated CHUKWA-17:
------------------------
Status: Patch Available (was: Open)
Index: src/contrib/chukwa/bin/exec-data-loader.sh
===================================================================
--- src/contrib/chukwa/bin/exec-data-loader.sh (revision 751021)
+++ src/contrib/chukwa/bin/exec-data-loader.sh (working copy)
@@ -35,6 +35,9 @@
if [ "X$PARM" == "Xtop" ]; then
kill -9 `cat ${CHUKWA_PID_DIR}/Top-data-loader.pid`
fi
+ if [ "X$PARM" == "Xps" ]; then
+ kill -9 `cat ${CHUKWA_PID_DIR}/Ps-data-loader.pid`
+ fi
if [ "X$PARM" == "Xdf" ]; then
kill -9 `cat ${CHUKWA_PID_DIR}/Df-data-loader.pid`
fi
@@ -71,6 +74,10 @@
${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME}
-DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR}
-DRECORD_TYPE=Top -Dlog4j.configuration=system-data-loader.properties
-classpath
${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR}
org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec top -b -n 1 -c &
fi
+if [ "X$PARM" == "Xps" ]; then
+ ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME}
-DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR}
-DRECORD_TYPE=Ps -Dlog4j.configuration=system-data-loader.properties -classpath
${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR}
org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec ps axo
pid,user,vsize,size,pcpu,pmem,time,start,cmd &
+fi
+
if [ "X$PARM" == "Xdf" ]; then
${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME}
-DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR}
-DRECORD_TYPE=Df -Dlog4j.configuration=system-data-loader.properties -classpath
${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR}
org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec df -l &
fi
Index: src/contrib/chukwa/bin/systemDataLoader.sh
===================================================================
--- src/contrib/chukwa/bin/systemDataLoader.sh (revision 751023)
+++ src/contrib/chukwa/bin/systemDataLoader.sh (working copy)
@@ -78,6 +78,23 @@
fi
EXISTS=0
+ pidFile="${CHUKWA_PID_DIR}/Ps-data-loader.pid"
+ if [ -f $pidFile ]; then
+ pid=`head ${pidFile}`
+ ChildPIDRunningStatus=`${JPS} | grep ${pid} | grep Exec | grep -v grep |
wc -l`
+ if [ $ChildPIDRunningStatus -ge 1 ]; then
+ EXISTS=1
+ fi
+ fi
+
+ if [ ${EXISTS} -lt 1 ]; then
+ echo "ps data loader is stopped."
+ RESULT=1
+ else
+ echo "ps data loader is running."
+ fi
+
+ EXISTS=0
pidFile="${CHUKWA_PID_DIR}/Df-data-loader.pid"
if [ -f $pidFile ]; then
pid=`head ${pidFile}`
@@ -125,6 +142,9 @@
if [ -f ${CHUKWA_PID_DIR}/Top-data-loader.pid ]; then
kill -9 `cat ${CHUKWA_PID_DIR}/Top-data-loader.pid`
fi
+ if [ -f ${CHUKWA_PID_DIR}/Ps-data-loader.pid ]; then
+ kill -9 `cat ${CHUKWA_PID_DIR}/Ps-data-loader.pid`
+ fi
if [ -f ${CHUKWA_PID_DIR}/Df-data-loader.pid ]; then
kill -9 `cat ${CHUKWA_PID_DIR}/Df-data-loader.pid`
fi
@@ -200,6 +220,20 @@
fi
EXISTS=0
+pidFile="${CHUKWA_PID_DIR}/Ps-data-loader.pid"
+if [ -f $pidFile ]; then
+ pid=`head ${pidFile}`
+ ChildPIDRunningStatus=`${JPS} | grep ${pid} | grep Exec | grep -v grep | wc
-l`
+ if [ $ChildPIDRunningStatus -ge 1 ]; then
+ EXISTS=1
+ fi
+fi
+
+if [ ${EXISTS} -lt 1 ]; then
+ ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME}
-DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR}
-DRECORD_TYPE=Ps -Dlog4j.configuration=system-data-loader.properties -classpath
${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR}
org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec ps axo
pid,user,vsize,size,pcpu,pmem,time,start,cmd &
+fi
+
+EXISTS=0
pidFile="${CHUKWA_PID_DIR}/Df-data-loader.pid"
if [ -f $pidFile ]; then
pid=`head ${pidFile}`
Index: src/contrib/chukwa/conf/chukwa-demux-conf.xml
===================================================================
--- src/contrib/chukwa/conf/chukwa-demux-conf.xml (revision 751024)
+++ src/contrib/chukwa/conf/chukwa-demux-conf.xml (working copy)
@@ -81,6 +81,12 @@
</property>
<property>
+ <name>Ps</name>
+
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Ps</value>
+ <description>Parser class for </description>
+ </property>
+
+ <property>
<name>Torque</name>
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Torque</value>
<description>Parser class for Parsing qstat and tracejob</description>
Index:
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
===================================================================
---
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
(revision 0)
+++
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
(revision 0)
@@ -0,0 +1,47 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class LogEntry {
+ private final static SimpleDateFormat sdf = new SimpleDateFormat(
+ "yyyy-MM-dd HH:mm");
+
+ private Date date;
+ private String logLevel;
+ private String className;
+ private String body;
+
+ public LogEntry(String recordEntry) throws ParseException {
+ String dStr = recordEntry.substring(0, 23);
+ date = sdf.parse(dStr);
+ int start = 24;
+ int idx = recordEntry.indexOf(' ', start);
+ logLevel = recordEntry.substring(start, idx);
+ start = idx + 1;
+ idx = recordEntry.indexOf(' ', start);
+ className = recordEntry.substring(start, idx - 1);
+ body = recordEntry.substring(idx + 1);
+ }
+
+ public Date getDate() {
+ return date;
+ }
+
+ public void setDate(Date date) {
+ this.date = date;
+ }
+
+ public String getLogLevel() {
+ return logLevel;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public String getBody() {
+ return body;
+ }
+}
Index:
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
===================================================================
---
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
(revision 0)
+++
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
(revision 0)
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class Ps extends AbstractProcessor {
+ static Logger log = Logger.getLogger(Ps.class);
+
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+ Reporter reporter) throws Throwable {
+ LogEntry log = new LogEntry(recordEntry);
+ PsOutput ps = new PsOutput(log.getBody());
+ for (HashMap<String, String> processInfo : ps.getProcessList())
{
+ key = new ChukwaRecordKey();
+ ChukwaRecord record = new ChukwaRecord();
+ this.buildGenericRecord(record, null,
log.getDate().getTime(), "Ps");
+ for (Entry<String, String> entry :
processInfo.entrySet()) {
+ record.add(entry.getKey(), entry.getValue());
+ }
+ output.collect(key, record);
+ }
+ }
+
+ public static class PsOutput {
+
+ // processes info
+ private ArrayList<HashMap<String, String>> recordList = new
ArrayList<HashMap<String, String>>();
+
+ public PsOutput(String psCmdOutput) throws InvalidPsRecord {
+ if (psCmdOutput == null || psCmdOutput.length() == 0)
+ return;
+
+ String[] lines = psCmdOutput.split("[\n\r]+");
+
+ // at least two lines
+ if (lines.length < 2)
+ return;
+
+ // header
+ ArrayList<String> header = new ArrayList<String>();
+ Matcher matcher = Pattern.compile("[^
^\t]+").matcher(lines[0]);
+ while (matcher.find()) {
+ header.add(matcher.group(0));
+ }
+ if (!header.get(header.size() - 1).equals("CMD")) {
+ throw new InvalidPsRecord("CMD must be the last
column");
+ }
+
+ // records
+ boolean foundInitCmd = false;
+ for (int line = 1; line < lines.length; line++) {
+ HashMap<String, String> record = new
HashMap<String, String>();
+ recordList.add(record);
+
+ matcher = Pattern.compile("[^
^\t]+").matcher(lines[line]);
+ for(int index = 0; index<header.size();
index++) {
+ String key = header.get(index);
+ matcher.find();
+ if (!key.equals("CMD")) {
+ String value = matcher.group(0);
+ /**
+ * For STARTED column, it could
be in two formats: "MMM dd" or "hh:mm:ss".
+ * If we use ' ' as the
delimiter, we must read twice to the date if it's
+ * with "MMM dd" format.
+ */
+ if (key.equals("STARTED")) {
+ char c =
value.charAt(0);
+ if ( c < '0' || c >
'9') {
+ matcher.find();
+ value +=
matcher.group(0);
+ }
+ }
+ record.put(key, value);
+ } else {
+ // reached the cmd part. all
remains should be put
+ // together as the command
+ String value =
lines[line].substring(matcher.start());
+ record.put(key, value);
+ if(!foundInitCmd)
+ foundInitCmd =
value.startsWith("init");
+ break;
+ }
+ }
+ }
+ if(!foundInitCmd)
+ throw new InvalidPsRecord("Did not find 'init'
cmd");
+ }
+
+ public ArrayList<HashMap<String, String>> getProcessList() {
+ return recordList;
+ }
+ }
+
+ public static class InvalidPsRecord extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidPsRecord() {
+ }
+
+ public InvalidPsRecord(String arg0) {
+ super(arg0);
+ }
+
+ public InvalidPsRecord(Throwable arg0) {
+ super(arg0);
+ }
+
+ public InvalidPsRecord(String arg0, Throwable arg1) {
+ super(arg0, arg1);
+ }
+ }
+}
Index:
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java
===================================================================
---
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java
(revision 749503)
+++
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java
(working copy)
@@ -85,20 +85,6 @@
this.buildGenericRecord(record, null,
d.getTime(), "SystemMetrics");
output.collect(key, record);
- StringBuffer buffer = new StringBuffer();
- //FIXME please validate this
- while (i < lines.length) {
- record = null;
- buffer.append(lines[i]+"\n");
- i++;
-
- }
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- this.buildGenericRecord(record,
buffer.toString(), d.getTime(), "Top");
- //Output Top info to database
- output.collect(key, record);
-
// End of parsing
} catch (Exception e)
{
Index:
src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PsOutputTest.java
===================================================================
---
src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PsOutputTest.java
(revision 0)
+++
src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PsOutputTest.java
(revision 0)
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ps;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import
org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Ps.InvalidPsRecord;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Ps.PsOutput;
+
+import junit.framework.TestCase;
+
+public class PsOutputTest extends TestCase {
+
+ public void testGetRecordListFromPsCmd() throws IOException,
InvalidPsRecord {
+ Runtime runtime = Runtime.getRuntime();
+ Process process = runtime.exec("ps axo
pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd");
+ StringBuffer sb = new StringBuffer();
+ InputStream is = process.getInputStream();
+ byte[] buffer = new byte[1024];
+ while(true) {
+ int len = is.read(buffer);
+ if(len == -1)
+ break;
+ sb.append(new String(buffer, 0, len));
+ }
+ String output = sb.toString();
+
+ PsOutput pso = new PsOutput(output);
+
+ // search init process
+ for(HashMap<String, String> processInfo : pso.getProcessList())
{
+ for(Entry<String, String> entry :
processInfo.entrySet()) {
+ if(entry.getKey().equals("CMD") &&
entry.getValue().startsWith("init")) {
+ return;
+ }
+ }
+ }
+
+ throw new InvalidPsRecord(output);
+ }
+
+ public void testGetRecordList() throws IOException, InvalidPsRecord {
+ // below is from command "ps axo
pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd"
+ String output =
+ " PID USER VSZ SZ %CPU %MEM TIME START
STARTED CMD\n" +
+ " 1 root 2064 284 0.0 0.0 00:00:02 2008
Dec 29 init [5]\n" +
+ " 2 root 0 0 0.0 0.0 00:00:01 2008
Dec 29 [migration/0]\n" +
+ "20270 chzhang 4248 588 0.0 0.0 00:00:00 15:32
15:32:36 ps axo pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd\n" +
+ "28371 angelac2 7100 1716 0.0 0.0 00:00:00 Feb27
Feb 27 /usr/libexec/gconfd-2 5\n";
+
+ PsOutput pso = new PsOutput(output);
+ ArrayList<HashMap<String, String>> processes =
pso.getProcessList();
+ assertEquals(4, processes.size());
+ assertEquals("Dec29", processes.get(0).get("STARTED"));
+ assertEquals("15:32:36", processes.get(2).get("STARTED"));
+ assertEquals("ps axo
pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd",
processes.get(2).get("CMD"));
+ }
+
+}
Index: src/contrib/chukwa/tools/service/chukwa-ps/run
===================================================================
--- src/contrib/chukwa/tools/service/chukwa-ps/run (revision 0)
+++ src/contrib/chukwa/tools/service/chukwa-ps/run (revision 0)
@@ -0,0 +1,5 @@
+#!/bin/sh
+CHUKWA_CONF_DIR=/usr/local/chukwa/conf
+
+exec setuidgid gmon /usr/local/chukwa/bin/exec-data-loader.sh --config
$CHUKWA_CONF_DIR ps
+
> Enhance process information collection
> ---------------------------------------
>
> Key: CHUKWA-17
> URL: https://issues.apache.org/jira/browse/CHUKWA-17
> Project: Hadoop Chukwa
> Issue Type: New Feature
> Environment: Redhat EL 5.1, Java 6
> Reporter: Cheng
> Priority: Minor
>
> We are currently getting the process information from "top". And a given
> process looks like,
> 28288 gmon 18 0 2225m 277m 8876 S 0 7.0 1:38.56 java
> However, that's still not complete. For it to be truly useful, we would want
> something like,
> gmon 28288 28244 0 Nov13 ? 00:01:38 /grid/0/java/jdk/bin/java
> -Xms1000M -Xmx2000M -DAPP=collector
> -Dlog4j.configuration=chukwa-log4j.properties -DCHUKWA_HOME=/gri........
> -DCHUKWA_CONF_DIR=/gri......./../conf
> -DCHUKWA_LOG_DIR=/grid.........bin/../var/log -classpath
> :/grid..........
> We can get those information by using command below,
> ps axo pid,user,vsize,size,pcpu,pmem,time,start_time,start,cmd
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.