FLUME-1773: File Channel worker thread should not be daemon

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/02cc9d4a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/02cc9d4a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/02cc9d4a

Branch: refs/heads/flume-1.3.0
Commit: 02cc9d4aeef96ef306292b59185c8f15439175b7
Parents: 31ebd3c
Author: Brock Noland <[email protected]>
Authored: Mon Dec 10 17:42:34 2012 -0600
Committer: Hari Shreedharan <[email protected]>
Committed: Thu Dec 20 00:13:07 2012 -0800

----------------------------------------------------------------------
 .../java/org/apache/flume/channel/file/Log.java    |   74 ++++++---------
 1 files changed, 31 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/02cc9d4a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index ea98e5d..7906d30 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -33,6 +33,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReferenceArray;
@@ -42,6 +44,7 @@ import 
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import javax.annotation.Nullable;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.apache.flume.annotations.InterfaceAudience;
@@ -83,10 +86,11 @@ class Log {
   private final AtomicInteger nextFileID = new AtomicInteger(0);
   private final File checkpointDir;
   private final File[] logDirs;
-  private final BackgroundWorker worker;
   private final int queueCapacity;
   private final AtomicReferenceArray<LogFile.Writer> logFiles;
 
+  private final ScheduledExecutorService workerExecutor;
+
   private volatile boolean open;
   private FlumeEventQueue queue;
   private long checkpointInterval;
@@ -262,7 +266,7 @@ class Log {
           encryptionCipherProvider);
     }
     open = false;
-    this.checkpointInterval = checkpointInterval;
+    this.checkpointInterval = Math.max(checkpointInterval, 1000);
     this.maxFileSize = maxFileSize;
     this.queueCapacity = queueCapacity;
     this.checkpointDir = checkpointDir;
@@ -270,10 +274,12 @@ class Log {
     this.logWriteTimeout = logWriteTimeout;
     this.checkpointWriteTimeout = checkpointWriteTimeout;
     logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
-    worker = new BackgroundWorker(this);
-    worker.setName("Log-BackgroundWorker-" + name);
-    worker.setDaemon(true);
-    worker.start();
+    workerExecutor = Executors.newSingleThreadScheduledExecutor(new
+        ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
+        .build());
+    workerExecutor.scheduleWithFixedDelay(new BackgroundWorker(this),
+        this.checkpointInterval, this.checkpointInterval,
+        TimeUnit.MILLISECONDS);
   }
 
   /**
@@ -646,10 +652,7 @@ class Log {
     lockExclusive();
     try {
       open = false;
-      if (worker != null) {
-        worker.shutdown();
-        worker.interrupt();
-      }
+      shutdownWorker();
       if (logFiles != null) {
         for (int index = 0; index < logFiles.length(); index++) {
           LogFile.Writer writer = logFiles.get(index);
@@ -684,9 +687,16 @@ class Log {
     }
   }
 
-  synchronized void shutdownWorker() {
-    Preconditions.checkNotNull(worker, "worker");
-    worker.shutdown();
+  void shutdownWorker() {
+    String msg = "Attempting to shutdown background worker.";
+    System.out.println(msg);
+    LOGGER.info(msg);
+    workerExecutor.shutdown();
+    try {
+      workerExecutor.awaitTermination(10, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.error("Interrupted while waiting for worker to die.");
+    }
   }
   void setCheckpointInterval(long checkpointInterval) {
     this.checkpointInterval = checkpointInterval;
@@ -997,7 +1007,7 @@ class Log {
     lock.channel().close();
     lock = null;
   }
-  static class BackgroundWorker extends Thread {
+  static class BackgroundWorker implements Runnable {
     private static final Logger LOG = LoggerFactory
         .getLogger(BackgroundWorker.class);
     private final Log log;
@@ -1007,38 +1017,16 @@ class Log {
       this.log = log;
     }
 
-    void shutdown() {
-      if(run) {
-        run = false;
-      }
-    }
-
     @Override
     public void run() {
-      long lastCheckTime = 0L;
-      while (run) {
-        try {
-          try {
-            Thread.sleep(Math.max(1000L, log.checkpointInterval / 10L));
-          } catch (InterruptedException e) {
-            // recheck run flag
-            continue;
-          }
-          if(log.open) {
-            // check to see if we should do a checkpoint
-            long currentTime = System.currentTimeMillis();
-            long elapsed = currentTime - lastCheckTime;
-            if (elapsed > log.checkpointInterval) {
-              if(log.writeCheckpoint()) {
-                lastCheckTime = currentTime;
-              }
-            }
-          }
-        } catch (IOException e) {
-          LOG.error("Error doing checkpoint", e);
-        } catch (Exception e) {
-          LOG.error("General error in checkpoint worker", e);
+      try {
+        if (log.open) {
+          log.writeCheckpoint();
         }
+      } catch (IOException e) {
+        LOG.error("Error doing checkpoint", e);
+      } catch (Exception e) {
+        LOG.error("General error in checkpoint worker", e);
       }
     }
   }

Reply via email to