Author: eyang
Date: Fri Feb 27 00:24:54 2009
New Revision: 748358

URL: http://svn.apache.org/viewvc?rev=748358&view=rev
Log:
HADOOP-5302.  Added check for record bigger than MAX_READ_SIZE.
(Contribute by Jerome Boulon via eyang)

Added:
    
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
Modified:
    
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
    
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
    
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java

Modified: 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=748358&r1=748357&r2=748358&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
 (original)
+++ 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
 Fri Feb 27 00:24:54 2009
@@ -18,20 +18,20 @@
 
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
-import org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 
-import java.io.*;
-import java.util.Timer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 /**
  * An adaptor that repeatedly tails a specified file, sending the new bytes.
  * This class does not split out records, but just sends everything up to end 
of file.
@@ -200,9 +200,10 @@
                if (len >= fileReadOffset) {
                        if(offsetOfFirstByte>fileReadOffset) {
                                // If the file rotated, the recorded 
offsetOfFirstByte is greater than file size,
-                               // reset the first byte position to beginning 
of the file.
-                               offsetOfFirstByte = 0L;                         
+                               // reset the first byte position to beginning 
of the file.      
                                fileReadOffset=0;
+                               offsetOfFirstByte = 0L;       
+                               log.warn("offsetOfFirstByte>fileReadOffset, 
resetting offset to 0");
                        }
                        
                        log.debug("Adaptor|" + adaptorID + "|seeking|" + 
fileReadOffset );
@@ -241,13 +242,26 @@
                        
                        long curOffset = fileReadOffset;
                        
-                       reader.read(buf);
+                       int bufferRead = reader.read(buf);
                        assert reader.getFilePointer() == fileReadOffset + 
bufSize : " event size arithmetic is broken: "
-                                       + " pointer is "
-                                       + reader.getFilePointer()
-                                       + " but offset is " + fileReadOffset + 
bufSize;
-           
+                         + " pointer is "
+                         + reader.getFilePointer()
+                         + " but offset is " + fileReadOffset + bufSize;
+
                        int bytesUsed = extractRecords(dest, fileReadOffset + 
offsetOfFirstByte, buf);
+
+                       // ===   WARNING   ===
+                       // If we couldn't found a complete record AND
+                       // we cannot read more, i.e bufferRead == MAX_READ_SIZE 
+                       // it's because the record is too BIG
+                       // So log.warn, and drop current buffer so we can keep 
moving
+                       // instead of being stopped at that point for ever
+                       if ( bytesUsed == 0 && bufferRead ==  MAX_READ_SIZE) {
+                         log.warn("bufferRead == MAX_READ_SIZE AND bytesUsed 
== 0, droping current buffer: startOffset=" 
+                             + curOffset + ", MAX_READ_SIZE=" + MAX_READ_SIZE 
+ ", for " + toWatch.getPath());
+                         bytesUsed = buf.length;
+                       }
+
                        fileReadOffset = fileReadOffset + bytesUsed;
                        
                        
@@ -255,13 +269,13 @@
                        
                        
                } else {
-                       // file has rotated and no detection
-                       reader.close();
-                       reader=null;
-                       fileReadOffset = 0L;
-                       offsetOfFirstByte = 0L;
-                       hasMoreData = true;
-                               log.warn("Adaptor|" + adaptorID +"| file has 
rotated and no detection - reset counters to 0L");         
+                 // file has rotated and no detection
+                 reader.close();
+                 reader=null;
+                 fileReadOffset = 0L;
+                 offsetOfFirstByte = 0L;
+                 hasMoreData = true;
+                 log.warn("Adaptor|" + adaptorID +"| file: " + 
toWatch.getPath() +", has rotated and no detection - reset counters to 0L");    
        
                }
            } catch (IOException e) {
                log.warn("failure reading " + toWatch, e);

Modified: 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java?rev=748358&r1=748357&r2=748358&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
 (original)
+++ 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
 Fri Feb 27 00:24:54 2009
@@ -17,74 +17,82 @@
  */
 
 package org.apache.hadoop.chukwa.datacollection.agent;
-import java.net.*;
-import java.io.*;
 
-import org.apache.hadoop.chukwa.datacollection.adaptor.*;
-import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
-import org.apache.log4j.Logger;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
 import java.util.Map;
 
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.log4j.Logger;
+
 /**
- * Class to handle the agent control protocol.
- * This is a simple line-oriented ASCII protocol, that is designed
- * to be easy to work with both programmatically and via telnet.
- *
- *  The port to bind to can be specified by setting option
- *     chukwaAgent.agent.control.port
+ * Class to handle the agent control protocol. This is a simple line-oriented
+ * ASCII protocol, that is designed to be easy to work with both
+ * programmatically and via telnet.
+ * 
+ * The port to bind to can be specified by setting option
+ * chukwaAgent.agent.control.port.
+ * A port of 0 creates a socket on any free port.
  */
 public class AgentControlSocketListener extends Thread {
 
+  static Logger log = Logger.getLogger(AgentControlSocketListener.class);
 
-  static Logger log= Logger.getLogger(AgentControlSocketListener.class);
-  
-  ChukwaAgent agent;
-  int portno;
-  ServerSocket s= null;
+  protected ChukwaAgent agent;
+  protected int portno;
+  protected ServerSocket s = null;
   volatile boolean closing = false;
-  
-  private class ListenThread extends Thread
-  {
+
+  private class ListenThread extends Thread {
     Socket connection;
-    ListenThread(Socket conn)  {
+
+    ListenThread(Socket conn) {
       connection = conn;
       this.setName("listen thread for " + connection.getRemoteSocketAddress());
     }
-    
-    public void run()  {
+
+    public void run() {
       try {
-      InputStream in = connection.getInputStream();
-      BufferedReader br = new BufferedReader(new InputStreamReader(in));
-      PrintStream out = new PrintStream(new 
BufferedOutputStream(connection.getOutputStream()));
-      //out.println("You are connected to the chukwa agent on "+ 
InetAddress.getLocalHost().getCanonicalHostName());
-      //out.flush();
-      String cmd = null;
-      while((cmd = br.readLine()) != null)  {
-        processCommand(cmd, out);
-      }
-      if (log.isDebugEnabled())
-               { log.debug("control connection closed");}
-      }
-      catch(SocketException e ) {
-        if(e.getMessage().equals("Socket Closed"))
+        InputStream in = connection.getInputStream();
+        BufferedReader br = new BufferedReader(new InputStreamReader(in));
+        PrintStream out = new PrintStream(new 
BufferedOutputStream(connection.getOutputStream()));
+        String cmd = null;
+        while ((cmd = br.readLine()) != null) {
+          processCommand(cmd, out);
+        }
+        if (log.isDebugEnabled()) {
+          log.debug("control connection closed");
+        }
+      } catch (SocketException e) {
+        if (e.getMessage().equals("Socket Closed"))
           log.info("control socket closed");
-      } catch(IOException e)  {
+      } catch (IOException e) {
         log.warn("a control connection broke", e);
       }
     }
-    
+
     /**
      * process a protocol command
+     * 
      * @param cmd the command given by the user
-     * @param out  a PrintStream writing to the socket
+     * @param out a PrintStream writing to the socket
      * @throws IOException
      */
-    public void processCommand(String cmd, PrintStream out) throws IOException 
 {
+    public void processCommand(String cmd, PrintStream out) throws IOException 
{
       String[] words = cmd.split("\\s+");
-      if (log.isDebugEnabled())
-               { log.debug("command from " + 
connection.getRemoteSocketAddress() + ":"+ cmd);}
-      
-      if(words[0].equalsIgnoreCase("help"))  {
+      if (log.isDebugEnabled()) {
+        log.debug("command from " + connection.getRemoteSocketAddress() + ":"+ 
cmd);
+      }
+
+      if (words[0].equalsIgnoreCase("help")) {
         out.println("you're talking to the Chukwa agent.  Commands available: 
");
         out.println("add [adaptorname] [args] [offset] -- start an adaptor");
         out.println("shutdown [adaptornumber]  -- graceful stop");
@@ -95,149 +103,151 @@
         out.println("reloadCollectors -- reload the list of collectors");
         out.println("help -- print this message");
         out.println("\t Command names are case-blind.");
-      }
-      else if(words[0].equalsIgnoreCase("close"))  {
+      } else if (words[0].equalsIgnoreCase("close")) {
         connection.close();
-      }
-      else if(words[0].equalsIgnoreCase("add"))   {
+      } else if (words[0].equalsIgnoreCase("add")) {
         long newID = agent.processCommand(cmd);
-        if(newID != -1)
-          out.println("OK add completed; new ID is " +newID);
+        if (newID != -1)
+          out.println("OK add completed; new ID is " + newID);
         else
           out.println("failed to start adaptor...check logs for details");
-      }
-      else if(words[0].equalsIgnoreCase("shutdown"))  {
-        if(words.length < 2) {
+      } else if (words[0].equalsIgnoreCase("shutdown")) {
+        if (words.length < 2) {
           out.println("need to specify an adaptor to shut down, by number");
-        }
-        else {
+        } else {
           long num = Long.parseLong(words[1]);
           long offset = agent.stopAdaptor(num, true);
-          if(offset != -1)
-            out.println("OK adaptor "+ num+ " stopping gracefully at " + 
offset);
+          if (offset != -1)
+            out.println("OK adaptor " + num + " stopping gracefully at "
+                + offset);
           else
             out.println("FAIL: perhaps adaptor " + num + " does not exist");
         }
-      }     
-      else if(words[0].equalsIgnoreCase("stop"))  {
-        if(words.length < 2) {
+      } else if (words[0].equalsIgnoreCase("stop")) {
+        if (words.length < 2) {
           out.println("need to specify an adaptor to shut down, by number");
         } else {
           long num = Long.parseLong(words[1]);
           agent.stopAdaptor(num, false);
-          out.println("OK adaptor "+ num+ " stopped");
+          out.println("OK adaptor " + num + " stopped");
         }
-      }
-      else if(words[0].equalsIgnoreCase("reloadCollectors"))  {
-            agent.getConnector().reloadConfiguration();
-            out.println("OK reloadCollectors done");
-        }else if(words[0].equalsIgnoreCase("list") )  {
+      } else if (words[0].equalsIgnoreCase("reloadCollectors")) {
+        agent.getConnector().reloadConfiguration();
+        out.println("OK reloadCollectors done");
+      } else if (words[0].equalsIgnoreCase("list")) {
         Map<Long, Adaptor> adaptorsByNumber = agent.getAdaptorList();
-        
-        if (log.isDebugEnabled())
-               { log.debug("number of adaptors: " + adaptorsByNumber.size());}
-        
-        synchronized(adaptorsByNumber)   {
-          for(Map.Entry<Long, Adaptor> a: adaptorsByNumber.entrySet())  {
-            try{
+
+        if (log.isDebugEnabled()) {
+          log.debug("number of adaptors: " + adaptorsByNumber.size());
+        }
+
+        synchronized (adaptorsByNumber) {
+          for (Map.Entry<Long, Adaptor> a : adaptorsByNumber.entrySet()) {
+            try {
               out.print(a.getKey());
               out.print(") ");
               out.print(" ");
               out.println(formatAdaptorStatus(a.getValue()));
-            }  catch(AdaptorException e)  {
+            } catch (AdaptorException e) {
               log.error(e);
             }
           }
           out.println("");
         }
-      } else if(words[0].equalsIgnoreCase("stopagent")) {
+      } else if (words[0].equalsIgnoreCase("stopagent")) {
         out.println("stopping agent process.");
         connection.close();
         agent.shutdown(true);
-      }
-      else  {
+      } else {
         log.warn("unknown command " + words[0]);
         out.println("unknown command" + words[0]);
         out.println("say 'help' for a list of legal commands");
       }
       out.flush();
     }
-    
+
   }
+
   /**
    * Initializes listener, but does not bind to socket.
+   * 
    * @param a the agent to control
    */
-  public AgentControlSocketListener(ChukwaAgent a)
-  {
-    ChukwaConfiguration conf = new ChukwaConfiguration();
-    this.setDaemon(false); //to keep the local agent alive
-    agent = a;
-    portno = conf.getInt("chukwaAgent.agent.control.port", 9093);
-    log.info("AgentControlSocketListerner port set to " + portno);
+  public AgentControlSocketListener(ChukwaAgent agent) {
+
+    this.setDaemon(false); // to keep the local agent alive
+    this.agent = agent;
+    this.portno = 
agent.getConfiguration().getInt("chukwaAgent.agent.control.port", 9093);
+    log.info("AgentControlSocketListerner ask for port: " + portno);
     this.setName("control socket listener");
   }
-  
-  public String formatAdaptorStatus(Adaptor a)  throws AdaptorException  {
+
+  public String formatAdaptorStatus(Adaptor a) throws AdaptorException {
     return a.getClass().getCanonicalName() + " " + a.getCurrentStatus();
   }
 
   /**
    * Binds to socket, starts looping listening for commands
    */
-  public void run()  {
+  public void run() {
     try {
-      if(!isBound()) 
+      if (!isBound())
         tryToBind();
-    } catch(IOException e) {
+    } catch (IOException e) {
       return;
     }
-    
-    while(!closing)
-    {
+
+    while (!closing) {
       try {
         Socket connection = s.accept();
-        if (log.isDebugEnabled())
-               { log.debug("new connection from " + 
connection.getInetAddress());}
+        if (log.isDebugEnabled()) {
+          log.debug("new connection from " + connection.getInetAddress());
+        }
         ListenThread l = new ListenThread(connection);
         l.setDaemon(true);
         l.start();
-      } catch(IOException e)  {
-        if(!closing)
-          log.warn("control socket error: ",e );
+      } catch (IOException e) {
+        if (!closing)
+          log.warn("control socket error: ", e);
         else {
-          log.info("shutting down listen thread due to shutdown() call");
+          log.warn("shutting down listen thread due to shutdown() call");
           break;
         }
       }
-    }//end while
+    }// end while
   }
+
   /**
    * Close the control socket, and exit. Triggers graceful thread shutdown.
    */
-  public void shutdown()  {
+  public void shutdown() {
     closing = true;
-    try{
-      if(s != null)
+    try {
+      if (s != null)
         s.close();
       s = null;
-    }
-    catch(IOException e)
-    {}  //ignore exception on close
+    } catch (IOException e) {
+    } // ignore exception on close
   }
 
   public boolean isBound() {
-    return s!= null &&  s.isBound();
+    return s != null && s.isBound();
   }
 
-  public void tryToBind() throws IOException
-  {
-    s= new ServerSocket(portno);
-    if(s.isBound())
-      log.debug("socket bound to " + portno);
+  public void tryToBind() throws IOException {
+    s = new ServerSocket(portno);
+    portno = s.getLocalPort();
+    if (s.isBound())
+      log.info("socket bound to " + s.getLocalPort());
     else
-      log.debug("socket isn't bound");
-     
+      log.info("socket isn't bound");
+  }
+
+  public int getPort() {
+    if (!s.isBound()) {
+      return -1;
+    } else {
+      return portno;
+    }
   }
-  
 }

Modified: 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=748358&r1=748357&r2=748358&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
 (original)
+++ 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
 Fri Feb 27 00:24:54 2009
@@ -102,6 +102,10 @@
 
   private final AgentControlSocketListener controlSock;
 
+  public int getControllerPort() {
+    return controlSock.getPort();
+  }
+  
   /**
    * @param args
    * @throws AdaptorException

Added: 
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java?rev=748358&view=auto
==============================================================================
--- 
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
 (added)
+++ 
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptorBigRecord.java
 Fri Feb 27 00:24:54 2009
@@ -0,0 +1,103 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import 
org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+public class TestFileTailingAdaptorBigRecord extends TestCase {
+
+  ChunkCatcherConnector chunks;
+
+  public void testBigRecord() {
+    File f = null;
+    try {
+      File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
+      if (!tempDir.exists()) {
+        tempDir.mkdirs();
+      }
+      String logFile = tempDir.getPath() + "/Chukwa-bigRecord.txt";
+      f = makeTestFile(logFile);
+
+      chunks = new ChunkCatcherConnector();
+      chunks.start();
+
+      // Remove any adaptor left over from previous run
+      ChukwaConfiguration cc = new ChukwaConfiguration();
+      cc.set("chukwaAgent.agent.control.port", "0");
+      cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 55);
+      ChukwaAgent agent = new ChukwaAgent(cc);
+      int portno = agent.getControllerPort();
+      while (portno == -1) {
+        Thread.sleep(1000);
+        portno = agent.getControllerPort();
+      }
+
+      // System.out.println("Port number:" + portno);
+      ChukwaAgentController cli = new ChukwaAgentController("localhost", 
portno);
+      cli.removeAll();
+      // sleep for some time to make sure we don't get chunk from existing
+      // streams
+      Thread.sleep(5000);
+      long adaptorId = agent
+          .processCommand("add 
org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped"
+              + " BigRecord " + logFile + " 0");
+      assertTrue(adaptorId != -1);
+
+      boolean record8Found = false;
+      Chunk c = null;
+      // Keep reading until record8
+      // If the adaptor is stopped then Junit will fail with a timeOut
+      while (!record8Found) {
+        c = chunks.waitForAChunk();
+        String data = new String(c.getData());
+        if (c.getDataType().equals("BigRecord")
+            && data.indexOf("8 abcdefghijklmnopqrstuvwxyz") >= 0) {
+          record8Found = true;
+        }
+      }
+      agent.getAdaptorList().get(adaptorId).shutdown();
+      agent.shutdown();
+    } catch (Exception e) {
+      Assert.fail("Exception in testBigRecord" + e.getMessage());
+    } finally {
+      if (f != null) {
+        f.delete();
+      }
+    }
+  }
+
+  private File makeTestFile(String name) throws IOException {
+    File tmpOutput = new File(name);
+    FileOutputStream fos = new FileOutputStream(tmpOutput);
+
+    PrintWriter pw = new PrintWriter(fos);
+    for (int i = 0; i < 5; ++i) {
+      pw.print(i + " ");
+      pw.println("abcdefghijklmnopqrstuvwxyz");
+    }
+    pw.print("6 ");
+    for (int i = 0; i < 10; ++i) {
+      pw.print("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz");
+    }
+    pw.print("\n");
+    pw.print("7 ");
+    pw.println("abcdefghijklmnopqrstuvwxyz");
+    pw.print("8 ");
+    pw.println("abcdefghijklmnopqrstuvwxyz");
+
+    pw.flush();
+    pw.close();
+    return tmpOutput;
+  }
+
+}


Reply via email to