Author: eyang
Date: Fri Feb 27 00:15:16 2009
New Revision: 748357

URL: http://svn.apache.org/viewvc?rev=748357&view=rev
Log:
HADOOP-5313.  Added 10 minutes limit for terminator thread to finish.
Removed static for adaptor.
Removed static for ChunkReceiver.
Removed log(" ") from the busy loop.
Add debug logging.
(Contribute by Jerome Boulon via eyang)

Modified:
    
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java

Modified: 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java?rev=748357&r1=748356&r2=748357&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
 (original)
+++ 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
 Fri Feb 27 00:15:16 2009
@@ -1,38 +1,52 @@
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
-import java.io.IOException;
-import java.util.TimerTask;
-
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
-import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
 import org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor;
-import org.apache.hadoop.chukwa.inputtools.plugin.metrics.ExecHelper;
 import org.apache.log4j.Logger;
 
 public class TerminatorThread extends Thread {
        private static Logger log =Logger.getLogger(FileAdaptor.class);
-       private static FileTailingAdaptor adaptor = null;
-       private static ChunkReceiver eq = null;
+
+       private FileTailingAdaptor adaptor = null;
+       private ChunkReceiver eq = null;
        
        public TerminatorThread(FileTailingAdaptor adaptor, ChunkReceiver eq) {
                this.adaptor = adaptor;
                this.eq = eq;
        }
 
-       public void run() {
-           log.info("Terminator thread started.");
-           try {
-               while(adaptor.tailFile(eq)) {
-                       log.info("");
-               }
-               } catch (InterruptedException e) {
-                       log.info("Unable to send data to collector for 60 
seconds, force shutdown.");
-               }
-        log.info("Terminator finished.");
-        try {
-               adaptor.reader.close();
-        } catch (IOException ex) {
-               
+  public void run() {
+    
+    long endTime = System.currentTimeMillis() + (10*60*1000); // now + 10 mins
+    int count = 0;
+    log.info("Terminator thread started." + adaptor.toWatch.getPath());
+    try {
+      while (adaptor.tailFile(eq)) {
+        if (log.isDebugEnabled()) {
+          log.debug("Terminator thread:" + adaptor.toWatch.getPath() + " still 
working");
         }
-       }
+        long now = System.currentTimeMillis();
+        if (now > endTime ) {
+          log.warn("TerminatorThread should have been finished by now! count=" 
+ count);
+          count ++;
+          endTime = System.currentTimeMillis() + (10*60*1000); // now + 10 mins
+          if (count >3 ) {
+            log.warn("TerminatorThread should have been finished by now, 
stopping it now! count=" + count);
+            break;
+          }
+        }
+      }
+    } catch (InterruptedException e) {
+      log.info("InterruptedException on Terminator thread:" + 
adaptor.toWatch.getPath(),e);
+    } catch (Throwable e) {
+      log.warn("Exception on Terminator thread:" + 
adaptor.toWatch.getPath(),e);
+    }
+    
+    log.info("Terminator thread finished." + adaptor.toWatch.getPath());
+    try {
+      adaptor.reader.close();
+    } catch (Throwable ex) {
+      // do nothing
+    }
+  }
 }


Reply via email to