Author: eyang
Date: Fri Feb 27 00:15:16 2009
New Revision: 748357
URL: http://svn.apache.org/viewvc?rev=748357&view=rev
Log:
HADOOP-5313. Added 10 minutes limit for terminator thread to finish.
Removed static for adaptor.
Removed static for ChunkReceiver.
Removed log(" ") from the busy loop.
Add debug logging.
(Contribute by Jerome Boulon via eyang)
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java?rev=748357&r1=748356&r2=748357&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
Fri Feb 27 00:15:16 2009
@@ -1,38 +1,52 @@
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
-import java.io.IOException;
-import java.util.TimerTask;
-
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
-import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
import org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor;
-import org.apache.hadoop.chukwa.inputtools.plugin.metrics.ExecHelper;
import org.apache.log4j.Logger;
public class TerminatorThread extends Thread {
private static Logger log =Logger.getLogger(FileAdaptor.class);
- private static FileTailingAdaptor adaptor = null;
- private static ChunkReceiver eq = null;
+
+ private FileTailingAdaptor adaptor = null;
+ private ChunkReceiver eq = null;
public TerminatorThread(FileTailingAdaptor adaptor, ChunkReceiver eq) {
this.adaptor = adaptor;
this.eq = eq;
}
- public void run() {
- log.info("Terminator thread started.");
- try {
- while(adaptor.tailFile(eq)) {
- log.info("");
- }
- } catch (InterruptedException e) {
- log.info("Unable to send data to collector for 60
seconds, force shutdown.");
- }
- log.info("Terminator finished.");
- try {
- adaptor.reader.close();
- } catch (IOException ex) {
-
+ public void run() {
+
+ long endTime = System.currentTimeMillis() + (10*60*1000); // now + 10 mins
+ int count = 0;
+ log.info("Terminator thread started." + adaptor.toWatch.getPath());
+ try {
+ while (adaptor.tailFile(eq)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Terminator thread:" + adaptor.toWatch.getPath() + " still
working");
}
- }
+ long now = System.currentTimeMillis();
+ if (now > endTime ) {
+ log.warn("TerminatorThread should have been finished by now! count="
+ count);
+ count ++;
+ endTime = System.currentTimeMillis() + (10*60*1000); // now + 10 mins
+ if (count >3 ) {
+ log.warn("TerminatorThread should have been finished by now,
stopping it now! count=" + count);
+ break;
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ log.info("InterruptedException on Terminator thread:" +
adaptor.toWatch.getPath(),e);
+ } catch (Throwable e) {
+ log.warn("Exception on Terminator thread:" +
adaptor.toWatch.getPath(),e);
+ }
+
+ log.info("Terminator thread finished." + adaptor.toWatch.getPath());
+ try {
+ adaptor.reader.close();
+ } catch (Throwable ex) {
+ // do nothing
+ }
+ }
}