Author: eyang
Date: Fri Feb 27 00:24:54 2009
New Revision: 748358
URL: http://svn.apache.org/viewvc?rev=748358&view=rev
Log:
HADOOP-5302. Added check for record bigger than MAX_READ_SIZE.
(Contribute by Jerome Boulon via eyang)
Added:
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=748358&r1=748357&r2=748358&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
Fri Feb 27 00:24:54 2009
@@ -18,20 +18,20 @@
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
-import org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
-import java.io.*;
-import java.util.Timer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
/**
* An adaptor that repeatedly tails a specified file, sending the new bytes.
* This class does not split out records, but just sends everything up to end
of file.
@@ -200,9 +200,10 @@
if (len >= fileReadOffset) {
if(offsetOfFirstByte>fileReadOffset) {
// If the file rotated, the recorded
offsetOfFirstByte is greater than file size,
- // reset the first byte position to beginning
of the file.
- offsetOfFirstByte = 0L;
+ // reset the first byte position to beginning
of the file.
fileReadOffset=0;
+ offsetOfFirstByte = 0L;
+ log.warn("offsetOfFirstByte>fileReadOffset,
resetting offset to 0");
}
log.debug("Adaptor|" + adaptorID + "|seeking|" +
fileReadOffset );
@@ -241,13 +242,26 @@
long curOffset = fileReadOffset;
- reader.read(buf);
+ int bufferRead = reader.read(buf);
assert reader.getFilePointer() == fileReadOffset +
bufSize : " event size arithmetic is broken: "
- + " pointer is "
- + reader.getFilePointer()
- + " but offset is " + fileReadOffset +
bufSize;
-
+ + " pointer is "
+ + reader.getFilePointer()
+ + " but offset is " + fileReadOffset + bufSize;
+
int bytesUsed = extractRecords(dest, fileReadOffset +
offsetOfFirstByte, buf);
+
+ // === WARNING ===
+ // If we couldn't found a complete record AND
+ // we cannot read more, i.e bufferRead == MAX_READ_SIZE
+ // it's because the record is too BIG
+ // So log.warn, and drop current buffer so we can keep
moving
+ // instead of being stopped at that point for ever
+ if ( bytesUsed == 0 && bufferRead == MAX_READ_SIZE) {
+ log.warn("bufferRead == MAX_READ_SIZE AND bytesUsed
== 0, droping current buffer: startOffset="
+ + curOffset + ", MAX_READ_SIZE=" + MAX_READ_SIZE
+ ", for " + toWatch.getPath());
+ bytesUsed = buf.length;
+ }
+
fileReadOffset = fileReadOffset + bytesUsed;
@@ -255,13 +269,13 @@
} else {
- // file has rotated and no detection
- reader.close();
- reader=null;
- fileReadOffset = 0L;
- offsetOfFirstByte = 0L;
- hasMoreData = true;
- log.warn("Adaptor|" + adaptorID +"| file has
rotated and no detection - reset counters to 0L");
+ // file has rotated and no detection
+ reader.close();
+ reader=null;
+ fileReadOffset = 0L;
+ offsetOfFirstByte = 0L;
+ hasMoreData = true;
+ log.warn("Adaptor|" + adaptorID +"| file: " +
toWatch.getPath() +", has rotated and no detection - reset counters to 0L");
}
} catch (IOException e) {
log.warn("failure reading " + toWatch, e);
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java?rev=748358&r1=748357&r2=748358&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
Fri Feb 27 00:24:54 2009
@@ -17,74 +17,82 @@
*/
package org.apache.hadoop.chukwa.datacollection.agent;
-import java.net.*;
-import java.io.*;
-import org.apache.hadoop.chukwa.datacollection.adaptor.*;
-import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
-import org.apache.log4j.Logger;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
import java.util.Map;
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.log4j.Logger;
+
/**
- * Class to handle the agent control protocol.
- * This is a simple line-oriented ASCII protocol, that is designed
- * to be easy to work with both programmatically and via telnet.
- *
- * The port to bind to can be specified by setting option
- * chukwaAgent.agent.control.port
+ * Class to handle the agent control protocol. This is a simple line-oriented
+ * ASCII protocol, that is designed to be easy to work with both
+ * programmatically and via telnet.
+ *
+ * The port to bind to can be specified by setting option
+ * chukwaAgent.agent.control.port.
+ * A port of 0 creates a socket on any free port.
*/
public class AgentControlSocketListener extends Thread {
+ static Logger log = Logger.getLogger(AgentControlSocketListener.class);
- static Logger log= Logger.getLogger(AgentControlSocketListener.class);
-
- ChukwaAgent agent;
- int portno;
- ServerSocket s= null;
+ protected ChukwaAgent agent;
+ protected int portno;
+ protected ServerSocket s = null;
volatile boolean closing = false;
-
- private class ListenThread extends Thread
- {
+
+ private class ListenThread extends Thread {
Socket connection;
- ListenThread(Socket conn) {
+
+ ListenThread(Socket conn) {
connection = conn;
this.setName("listen thread for " + connection.getRemoteSocketAddress());
}
-
- public void run() {
+
+ public void run() {
try {
- InputStream in = connection.getInputStream();
- BufferedReader br = new BufferedReader(new InputStreamReader(in));
- PrintStream out = new PrintStream(new
BufferedOutputStream(connection.getOutputStream()));
- //out.println("You are connected to the chukwa agent on "+
InetAddress.getLocalHost().getCanonicalHostName());
- //out.flush();
- String cmd = null;
- while((cmd = br.readLine()) != null) {
- processCommand(cmd, out);
- }
- if (log.isDebugEnabled())
- { log.debug("control connection closed");}
- }
- catch(SocketException e ) {
- if(e.getMessage().equals("Socket Closed"))
+ InputStream in = connection.getInputStream();
+ BufferedReader br = new BufferedReader(new InputStreamReader(in));
+ PrintStream out = new PrintStream(new
BufferedOutputStream(connection.getOutputStream()));
+ String cmd = null;
+ while ((cmd = br.readLine()) != null) {
+ processCommand(cmd, out);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("control connection closed");
+ }
+ } catch (SocketException e) {
+ if (e.getMessage().equals("Socket Closed"))
log.info("control socket closed");
- } catch(IOException e) {
+ } catch (IOException e) {
log.warn("a control connection broke", e);
}
}
-
+
/**
* process a protocol command
+ *
* @param cmd the command given by the user
- * @param out a PrintStream writing to the socket
+ * @param out a PrintStream writing to the socket
* @throws IOException
*/
- public void processCommand(String cmd, PrintStream out) throws IOException
{
+ public void processCommand(String cmd, PrintStream out) throws IOException
{
String[] words = cmd.split("\\s+");
- if (log.isDebugEnabled())
- { log.debug("command from " +
connection.getRemoteSocketAddress() + ":"+ cmd);}
-
- if(words[0].equalsIgnoreCase("help")) {
+ if (log.isDebugEnabled()) {
+ log.debug("command from " + connection.getRemoteSocketAddress() + ":"+
cmd);
+ }
+
+ if (words[0].equalsIgnoreCase("help")) {
out.println("you're talking to the Chukwa agent. Commands available:
");
out.println("add [adaptorname] [args] [offset] -- start an adaptor");
out.println("shutdown [adaptornumber] -- graceful stop");
@@ -95,149 +103,151 @@
out.println("reloadCollectors -- reload the list of collectors");
out.println("help -- print this message");
out.println("\t Command names are case-blind.");
- }
- else if(words[0].equalsIgnoreCase("close")) {
+ } else if (words[0].equalsIgnoreCase("close")) {
connection.close();
- }
- else if(words[0].equalsIgnoreCase("add")) {
+ } else if (words[0].equalsIgnoreCase("add")) {
long newID = agent.processCommand(cmd);
- if(newID != -1)
- out.println("OK add completed; new ID is " +newID);
+ if (newID != -1)
+ out.println("OK add completed; new ID is " + newID);
else
out.println("failed to start adaptor...check logs for details");
- }
- else if(words[0].equalsIgnoreCase("shutdown")) {
- if(words.length < 2) {
+ } else if (words[0].equalsIgnoreCase("shutdown")) {
+ if (words.length < 2) {
out.println("need to specify an adaptor to shut down, by number");
- }
- else {
+ } else {
long num = Long.parseLong(words[1]);
long offset = agent.stopAdaptor(num, true);
- if(offset != -1)
- out.println("OK adaptor "+ num+ " stopping gracefully at " +
offset);
+ if (offset != -1)
+ out.println("OK adaptor " + num + " stopping gracefully at "
+ + offset);
else
out.println("FAIL: perhaps adaptor " + num + " does not exist");
}
- }
- else if(words[0].equalsIgnoreCase("stop")) {
- if(words.length < 2) {
+ } else if (words[0].equalsIgnoreCase("stop")) {
+ if (words.length < 2) {
out.println("need to specify an adaptor to shut down, by number");
} else {
long num = Long.parseLong(words[1]);
agent.stopAdaptor(num, false);
- out.println("OK adaptor "+ num+ " stopped");
+ out.println("OK adaptor " + num + " stopped");
}
- }
- else if(words[0].equalsIgnoreCase("reloadCollectors")) {
- agent.getConnector().reloadConfiguration();
- out.println("OK reloadCollectors done");
- }else if(words[0].equalsIgnoreCase("list") ) {
+ } else if (words[0].equalsIgnoreCase("reloadCollectors")) {
+ agent.getConnector().reloadConfiguration();
+ out.println("OK reloadCollectors done");
+ } else if (words[0].equalsIgnoreCase("list")) {
Map<Long, Adaptor> adaptorsByNumber = agent.getAdaptorList();
-
- if (log.isDebugEnabled())
- { log.debug("number of adaptors: " + adaptorsByNumber.size());}
-
- synchronized(adaptorsByNumber) {
- for(Map.Entry<Long, Adaptor> a: adaptorsByNumber.entrySet()) {
- try{
+
+ if (log.isDebugEnabled()) {
+ log.debug("number of adaptors: " + adaptorsByNumber.size());
+ }
+
+ synchronized (adaptorsByNumber) {
+ for (Map.Entry<Long, Adaptor> a : adaptorsByNumber.entrySet()) {
+ try {
out.print(a.getKey());
out.print(") ");
out.print(" ");
out.println(formatAdaptorStatus(a.getValue()));
- } catch(AdaptorException e) {
+ } catch (AdaptorException e) {
log.error(e);
}
}
out.println("");
}
- } else if(words[0].equalsIgnoreCase("stopagent")) {
+ } else if (words[0].equalsIgnoreCase("stopagent")) {
out.println("stopping agent process.");
connection.close();
agent.shutdown(true);
- }
- else {
+ } else {
log.warn("unknown command " + words[0]);
out.println("unknown command" + words[0]);
out.println("say 'help' for a list of legal commands");
}
out.flush();
}
-
+
}
+
/**
* Initializes listener, but does not bind to socket.
+ *
* @param a the agent to control
*/
- public AgentControlSocketListener(ChukwaAgent a)
- {
- ChukwaConfiguration conf = new ChukwaConfiguration();
- this.setDaemon(false); //to keep the local agent alive
- agent = a;
- portno = conf.getInt("chukwaAgent.agent.control.port", 9093);
- log.info("AgentControlSocketListerner port set to " + portno);
+ public AgentControlSocketListener(ChukwaAgent agent) {
+
+ this.setDaemon(false); // to keep the local agent alive
+ this.agent = agent;
+ this.portno =
agent.getConfiguration().getInt("chukwaAgent.agent.control.port", 9093);
+ log.info("AgentControlSocketListerner ask for port: " + portno);
this.setName("control socket listener");
}
-
- public String formatAdaptorStatus(Adaptor a) throws AdaptorException {
+
+ public String formatAdaptorStatus(Adaptor a) throws AdaptorException {
return a.getClass().getCanonicalName() + " " + a.getCurrentStatus();
}
/**
* Binds to socket, starts looping listening for commands
*/
- public void run() {
+ public void run() {
try {
- if(!isBound())
+ if (!isBound())
tryToBind();
- } catch(IOException e) {
+ } catch (IOException e) {
return;
}
-
- while(!closing)
- {
+
+ while (!closing) {
try {
Socket connection = s.accept();
- if (log.isDebugEnabled())
- { log.debug("new connection from " +
connection.getInetAddress());}
+ if (log.isDebugEnabled()) {
+ log.debug("new connection from " + connection.getInetAddress());
+ }
ListenThread l = new ListenThread(connection);
l.setDaemon(true);
l.start();
- } catch(IOException e) {
- if(!closing)
- log.warn("control socket error: ",e );
+ } catch (IOException e) {
+ if (!closing)
+ log.warn("control socket error: ", e);
else {
- log.info("shutting down listen thread due to shutdown() call");
+ log.warn("shutting down listen thread due to shutdown() call");
break;
}
}
- }//end while
+ }// end while
}
+
/**
* Close the control socket, and exit. Triggers graceful thread shutdown.
*/
- public void shutdown() {
+ public void shutdown() {
closing = true;
- try{
- if(s != null)
+ try {
+ if (s != null)
s.close();
s = null;
- }
- catch(IOException e)
- {} //ignore exception on close
+ } catch (IOException e) {
+ } // ignore exception on close
}
public boolean isBound() {
- return s!= null && s.isBound();
+ return s != null && s.isBound();
}
- public void tryToBind() throws IOException
- {
- s= new ServerSocket(portno);
- if(s.isBound())
- log.debug("socket bound to " + portno);
+ public void tryToBind() throws IOException {
+ s = new ServerSocket(portno);
+ portno = s.getLocalPort();
+ if (s.isBound())
+ log.info("socket bound to " + s.getLocalPort());
else
- log.debug("socket isn't bound");
-
+ log.info("socket isn't bound");
+ }
+
+ public int getPort() {
+ if (!s.isBound()) {
+ return -1;
+ } else {
+ return portno;
+ }
}
-
}
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=748358&r1=748357&r2=748358&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
Fri Feb 27 00:24:54 2009
@@ -102,6 +102,10 @@
private final AgentControlSocketListener controlSock;
+ public int getControllerPort() {
+ return controlSock.getPort();
+ }
+
/**
* @param args
* @throws AdaptorException
Added:
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java?rev=748358&view=auto
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
(added)
+++
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
Fri Feb 27 00:24:54 2009
@@ -0,0 +1,103 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import
org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+public class TestFileTailingAdaptorBigRecord extends TestCase {
+
+ ChunkCatcherConnector chunks;
+
+ public void testBigRecord() {
+ File f = null;
+ try {
+ File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
+ if (!tempDir.exists()) {
+ tempDir.mkdirs();
+ }
+ String logFile = tempDir.getPath() + "/Chukwa-bigRecord.txt";
+ f = makeTestFile(logFile);
+
+ chunks = new ChunkCatcherConnector();
+ chunks.start();
+
+ // Remove any adaptor left over from previous run
+ ChukwaConfiguration cc = new ChukwaConfiguration();
+ cc.set("chukwaAgent.agent.control.port", "0");
+ cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 55);
+ ChukwaAgent agent = new ChukwaAgent(cc);
+ int portno = agent.getControllerPort();
+ while (portno == -1) {
+ Thread.sleep(1000);
+ portno = agent.getControllerPort();
+ }
+
+ // System.out.println("Port number:" + portno);
+ ChukwaAgentController cli = new ChukwaAgentController("localhost",
portno);
+ cli.removeAll();
+ // sleep for some time to make sure we don't get chunk from existing
+ // streams
+ Thread.sleep(5000);
+ long adaptorId = agent
+ .processCommand("add
org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped"
+ + " BigRecord " + logFile + " 0");
+ assertTrue(adaptorId != -1);
+
+ boolean record8Found = false;
+ Chunk c = null;
+ // Keep reading until record8
+ // If the adaptor is stopped then Junit will fail with a timeOut
+ while (!record8Found) {
+ c = chunks.waitForAChunk();
+ String data = new String(c.getData());
+ if (c.getDataType().equals("BigRecord")
+ && data.indexOf("8 abcdefghijklmnopqrstuvwxyz") >= 0) {
+ record8Found = true;
+ }
+ }
+ agent.getAdaptorList().get(adaptorId).shutdown();
+ agent.shutdown();
+ } catch (Exception e) {
+ Assert.fail("Exception in testBigRecord" + e.getMessage());
+ } finally {
+ if (f != null) {
+ f.delete();
+ }
+ }
+ }
+
+ private File makeTestFile(String name) throws IOException {
+ File tmpOutput = new File(name);
+ FileOutputStream fos = new FileOutputStream(tmpOutput);
+
+ PrintWriter pw = new PrintWriter(fos);
+ for (int i = 0; i < 5; ++i) {
+ pw.print(i + " ");
+ pw.println("abcdefghijklmnopqrstuvwxyz");
+ }
+ pw.print("6 ");
+ for (int i = 0; i < 10; ++i) {
+ pw.print("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz");
+ }
+ pw.print("\n");
+ pw.print("7 ");
+ pw.println("abcdefghijklmnopqrstuvwxyz");
+ pw.print("8 ");
+ pw.println("abcdefghijklmnopqrstuvwxyz");
+
+ pw.flush();
+ pw.close();
+ return tmpOutput;
+ }
+
+}