Author: cdouglas
Date: Fri Jan 16 19:37:43 2009
New Revision: 735222
URL: http://svn.apache.org/viewvc?rev=735222&view=rev
Log:
HADOOP-4993. Fix Chukwa agent configuration and startup to make it both
more modular and testable. Contributed by Ari Rabkin
Added:
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=735222&r1=735221&r2=735222&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jan 16 19:37:43 2009
@@ -611,6 +611,9 @@
HADOOP-4818. Pass user config to instrumentation API. (Eric Yang via
cdouglas)
+ HADOOP-4993. Fix Chukwa agent configuration and startup to make it both
+ more modular and testable. (Ari Rabkin via cdouglas)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=735222&r1=735221&r2=735222&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
Fri Jan 16 19:37:43 2009
@@ -41,7 +41,6 @@
*/
public class ChukwaAgent
{
- boolean DO_CHECKPOINT_RESTORE = true;
//boolean WRITE_CHECKPOINTS = true;
static Logger log = Logger.getLogger(ChukwaAgent.class);
@@ -87,9 +86,7 @@
private File checkpointDir; // lock this object to indicate checkpoint in
// progress
- private File initialAdaptors;
private String CHECKPOINT_BASE_NAME; // base filename for checkpoint files
- private int CHECKPOINT_INTERVAL_MS; // min interval at which to write
// checkpoints
private static String tags = "";
@@ -122,7 +119,8 @@
"[default collector URL]");
System.exit(0);
}
- ChukwaAgent localAgent = new ChukwaAgent();
+ Configuration conf = readConfig();
+ ChukwaAgent localAgent = new ChukwaAgent(conf);
if (agent.anotherAgentIsRunning())
{
@@ -135,15 +133,9 @@
int uriArgNumber = 0;
if (args.length > 0)
{
- if (args[0].equalsIgnoreCase("-noCheckPoint"))
- {
- agent.DO_CHECKPOINT_RESTORE = false;
- uriArgNumber = 1;
- }
if (args[uriArgNumber].equals("local"))
agent.connector = new ConsoleOutConnector(agent);
- else
- {
+ else {
if (!args[uriArgNumber].contains("://"))
args[uriArgNumber] = "http://" + args[uriArgNumber];
agent.connector = new HttpConnector(agent, args[uriArgNumber]);
@@ -182,33 +174,67 @@
{
return adaptorsByNumber.size();
}
+
- public ChukwaAgent() throws AlreadyRunningException
- {
- ChukwaAgent.agent = this;
+ public ChukwaAgent() throws AlreadyRunningException {
+ this(new Configuration());
+ }
- readConfig();
+ public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
+ ChukwaAgent.agent = this;
+ this.conf = conf;
// almost always just reading this; so use a ConcurrentHM.
// since we wrapped the offset, it's not a structural mod.
adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
adaptorsByNumber = new HashMap<Long, Adaptor>();
checkpointNumber = 0;
- try
- {
- if (DO_CHECKPOINT_RESTORE)
+
+ boolean DO_CHECKPOINT_RESTORE =
conf.getBoolean("chukwaAgent.checkpoint.enabled",
+ true);
+ CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
+ "chukwa_checkpoint_");
+ final int CHECKPOINT_INTERVAL_MS =
conf.getInt("chukwaAgent.checkpoint.interval",
+ 5000);
+
+ if(conf.get("chukwaAgent.checkpoint.dir") != null)
+ checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
+ else
+ DO_CHECKPOINT_RESTORE = false;
+
+ if (checkpointDir!= null && !checkpointDir.exists()) {
+ checkpointDir.mkdirs();
+ }
+ tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
+
+ log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
+ log.info("Config - checkpointDir: [" + checkpointDir + "]");
+ log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
+ + "]");
+ log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE +
"]");
+ log.info("Config - tags: [" + tags + "]");
+
+ if (DO_CHECKPOINT_RESTORE) {
+ needNewCheckpoint = true;
+ log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
+ }
+
+ File initialAdaptors = null;
+ if(conf.get("chukwaAgent.initial_adaptors") != null)
+ initialAdaptors= new File( conf.get("chukwaAgent.initial_adaptors"));
+
+ try {
+ if (DO_CHECKPOINT_RESTORE) {
restoreFromCheckpoint();
- } catch (IOException e)
- {
+ }
+ } catch (IOException e) {
log.warn("failed to restart from checkpoint: ", e);
}
- try
- {
- if (initialAdaptors != null && initialAdaptors.exists())
- readAdaptorsFile(initialAdaptors);
- } catch (IOException e)
- {
+ try {
+ if (initialAdaptors != null && initialAdaptors.exists() &&
checkpointNumber ==0)
+ readAdaptorsFile(initialAdaptors); //don't read after checkpoint
restore
+ } catch (IOException e) {
log.warn("couldn't read user-specified file "
+ initialAdaptors.getAbsolutePath());
}
@@ -221,8 +247,7 @@
controlSock.start(); // this sets us up as a daemon
log.info("control socket started on port " + controlSock.portno);
- if (CHECKPOINT_INTERVAL_MS > 0)
- {
+ if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir!= null) {
checkpointer = new Timer();
checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
}
@@ -242,7 +267,7 @@
// but can be arbitrarily many space
// delimited agent specific params )
// 4) offset
- Pattern addCmdPattern =
Pattern.compile("add\\s+(\\S+)\\s+(\\S+)\\s+(.*\\S)?\\s*(\\d+)\\s*");
+ Pattern addCmdPattern =
Pattern.compile("[aA][dD][dD]\\s+(\\S+)\\s+(\\S+)\\s+(.*\\S)?\\s*(\\d+)\\s*");
// FIXME: should handle bad lines here
public long processCommand(String cmd)
{
@@ -293,8 +318,10 @@
}
}
} else
- log.warn("only 'add' command supported in config files");
-
+ if(cmd.length() > 0)
+ log.warn("only 'add' command supported in config files");
+ //no warning for blank line
+
return -1;
}
@@ -351,7 +378,7 @@
}
}
- checkpointNumber = lowestIndex;
+ checkpointNumber = lowestIndex+1;
File checkpoint = new File(checkpointDir, lowestName);
readAdaptorsFile(checkpoint);
}
@@ -494,60 +521,31 @@
return connector;
}
- protected void readConfig() {
- conf = new Configuration();
-
- String chukwaHome = System.getenv("CHUKWA_HOME");
- if (chukwaHome == null) {
- chukwaHome = ".";
- }
-
- if (!chukwaHome.endsWith("/")) {
- chukwaHome = chukwaHome + File.separator;
- }
- log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
-
- String chukwaConf = System.getProperty("CHUKWA_CONF_DIR");
- if (chukwaConf == null) {
- chukwaConf = chukwaHome + "conf" + File.separator;
- }
- if (!chukwaHome.endsWith("/")) {
- chukwaHome = chukwaHome + File.separator;
- }
- if (!chukwaConf.endsWith("/")) {
- chukwaConf = chukwaConf + File.separator;
- }
- log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
-
- conf.addResource(new Path(chukwaConf + "chukwa-agent-conf.xml"));
- DO_CHECKPOINT_RESTORE = conf.getBoolean("chukwaAgent.checkpoint.enabled",
- true);
- CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
- "chukwa_checkpoint_");
- checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", chukwaHome
- + "/var/"));
- CHECKPOINT_INTERVAL_MS = conf.getInt("chukwaAgent.checkpoint.interval",
- 5000);
- if (!checkpointDir.exists())
- {
- checkpointDir.mkdirs();
- }
- tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
+ protected static Configuration readConfig() {
+ Configuration conf = new Configuration();
- log.info("Config - chukwaHome: [" + chukwaHome + "]");
- log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
- log.info("Config - checkpointDir: [" + checkpointDir + "]");
- log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
- + "]");
- log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE +
"]");
- log.info("Config - tags: [" + tags + "]");
-
- if (DO_CHECKPOINT_RESTORE) {
- needNewCheckpoint = true;
- log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
- }
-
- initialAdaptors = new File(chukwaConf + "initial_adaptors");
+ String chukwaHomeName = System.getenv("CHUKWA_HOME");
+ if (chukwaHomeName == null) {
+ chukwaHomeName = "";
+ }
+ File chukwaHome = new File(chukwaHomeName).getAbsoluteFile();
+
+ log.info("Config - CHUKWA_HOME: [" + chukwaHome.toString() + "]");
+
+ String chukwaConfName = System.getProperty("CHUKWA_CONF_DIR");
+ File chukwaConf;
+ if (chukwaConfName != null)
+ chukwaConf = new File(chukwaConfName).getAbsoluteFile();
+ else
+ chukwaConf = new File(chukwaHome, "conf");
+
+ log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
+ File agentConf = new File(chukwaConf,"chukwa-agent-conf.xml");
+ conf.addResource(new Path(agentConf.getAbsolutePath()));
+ if(conf.get("chukwaAgent.checkpoint.dir") == null)
+ conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome,
"var").getAbsolutePath());
+ conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf,
"initial_adaptors").getAbsolutePath());
+ return conf;
}
public void shutdown() {
@@ -559,15 +557,15 @@
* explicitly. It probably should.
*/
public void shutdown(boolean exit) {
- if (checkpointer != null)
- checkpointer.cancel();
controlSock.shutdown(); // make sure we don't get new requests
- try {
- if (needNewCheckpoint)
+ if (checkpointer != null) {
+ checkpointer.cancel();
+ try {
writeCheckpoint(); // write a last checkpoint here, before stopping
- // adaptors
- } catch (IOException e) {
+ } catch (IOException e) {
+ }
}
+ // adaptors
synchronized (adaptorsByNumber) {
// shut down each adaptor
@@ -580,6 +578,8 @@
}
}
}
+ adaptorsByNumber.clear();
+ adaptorPositions.clear();
if (exit)
System.exit(0);
}
Added:
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java?rev=735222&view=auto
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
(added)
+++
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
Fri Jan 16 19:37:43 2009
@@ -0,0 +1,76 @@
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import java.io.*;
+
+import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
+import org.apache.hadoop.conf.Configuration;
+
+import junit.framework.TestCase;
+
+public class TestAgentConfig extends TestCase {
+ public void testInitAdaptors_vs_Checkpoint() {
+ try {
+ //create two target files, foo and bar
+ File foo = File.createTempFile("foo", "test");
+ foo.deleteOnExit();
+ PrintStream ps = new PrintStream(new FileOutputStream(foo));
+ ps.println("foo");
+ ps.close();
+
+ File bar = File.createTempFile("bar", "test");
+ bar.deleteOnExit();
+ ps = new PrintStream(new FileOutputStream(bar));
+ ps.println("bar");
+ ps.close();
+
+ //initially, read foo
+ File initialAdaptors = File.createTempFile("initial", "adaptors");
+ initialAdaptors.deleteOnExit();
+ ps = new PrintStream(new FileOutputStream(initialAdaptors));
+ ps.println("add
org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8
raw 0 "
+ + foo.getAbsolutePath() +" 0 ");
+ ps.close();
+
+ Configuration conf = new Configuration();
+ conf.set("chukwaAgent.initial_adaptors",
initialAdaptors.getAbsolutePath());
+ File checkpointDir = File.createTempFile("chukwatest", "checkpoint");
+ checkpointDir.delete();
+ checkpointDir.mkdir();
+ checkpointDir.deleteOnExit();
+ conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getAbsolutePath());
+
+ ChukwaAgent agent = new ChukwaAgent(conf);
+ ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
+ conn.start();
+ assertEquals(1, agent.adaptorCount());//check that we processed initial
adaptors
+ assertNotNull(agent.getAdaptorList().get(1L));
+
assertTrue(agent.getAdaptorList().get(1L).getStreamName().contains("foo"));
+
+ System.out.println("---------------------done with first run, now
stopping");
+ agent.shutdown();
+ assertEquals(0, agent.adaptorCount());
+ //at this point, there should be a checkpoint file with a tailer reading
foo.
+ //we're going to rewrite initial adaptors to read bar; but after reboot
we should
+ //still only be looking at foo.
+ ps = new PrintStream(new FileOutputStream(initialAdaptors,
false));//overwrite
+ ps.println("add
org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8
raw 0 "
+ + bar.getAbsolutePath() +" 0 ");
+ ps.close();
+
+ System.out.println("---------------------restarting");
+ agent = new ChukwaAgent(conf);
+ conn = new ConsoleOutConnector(agent, true);
+ conn.start();
+ assertEquals(1, agent.adaptorCount());//check that we processed initial
adaptors
+ assertNotNull(agent.getAdaptorList().get(1L));
+
assertTrue(agent.getAdaptorList().get(1L).getStreamName().contains("foo"));
+ agent.shutdown();
+ System.out.println("---------------------done");
+
+
+ } catch(Exception e) {
+ e.printStackTrace();
+ fail(e.toString());
+ }
+ }
+}