FLUME-1773: File Channel worker thread should not be daemon (Hari Shreedharan via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/02cc9d4a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/02cc9d4a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/02cc9d4a Branch: refs/heads/flume-1.3.0 Commit: 02cc9d4aeef96ef306292b59185c8f15439175b7 Parents: 31ebd3c Author: Brock Noland <[email protected]> Authored: Mon Dec 10 17:42:34 2012 -0600 Committer: Hari Shreedharan <[email protected]> Committed: Thu Dec 20 00:13:07 2012 -0800 ---------------------------------------------------------------------- .../java/org/apache/flume/channel/file/Log.java | 74 ++++++--------- 1 files changed, 31 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/02cc9d4a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index ea98e5d..7906d30 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -33,6 +33,8 @@ import java.util.List; import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -42,6 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import javax.annotation.Nullable; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.flume.ChannelException; import org.apache.flume.Event; import org.apache.flume.annotations.InterfaceAudience; @@ -83,10 +86,11 @@ class Log { private final AtomicInteger nextFileID = new AtomicInteger(0); private final File checkpointDir; private final File[] logDirs; - private final BackgroundWorker worker; private final int queueCapacity; private final AtomicReferenceArray<LogFile.Writer> logFiles; + private final ScheduledExecutorService workerExecutor; + private volatile boolean open; private FlumeEventQueue queue; private long checkpointInterval; @@ -262,7 +266,7 @@ class Log { encryptionCipherProvider); } open = false; - this.checkpointInterval = checkpointInterval; + this.checkpointInterval = Math.max(checkpointInterval, 1000); this.maxFileSize = maxFileSize; this.queueCapacity = queueCapacity; this.checkpointDir = checkpointDir; @@ -270,10 +274,12 @@ class Log { this.logWriteTimeout = logWriteTimeout; this.checkpointWriteTimeout = checkpointWriteTimeout; logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length); - worker = new BackgroundWorker(this); - worker.setName("Log-BackgroundWorker-" + name); - worker.setDaemon(true); - worker.start(); + workerExecutor = Executors.newSingleThreadScheduledExecutor(new + ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name) + .build()); + workerExecutor.scheduleWithFixedDelay(new BackgroundWorker(this), + this.checkpointInterval, this.checkpointInterval, + TimeUnit.MILLISECONDS); } /** @@ -646,10 +652,7 @@ class Log { lockExclusive(); try { open = false; - if (worker != null) { - worker.shutdown(); - worker.interrupt(); - } + shutdownWorker(); if (logFiles != null) { for (int index = 0; index < logFiles.length(); index++) { LogFile.Writer writer = logFiles.get(index); @@ -684,9 +687,16 @@ class Log { } } - synchronized void shutdownWorker() { - Preconditions.checkNotNull(worker, "worker"); - worker.shutdown(); + void shutdownWorker() { + String msg = "Attempting to shutdown background worker."; + System.out.println(msg); + LOGGER.info(msg); + workerExecutor.shutdown(); + try { + workerExecutor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error("Interrupted while waiting for worker to die."); + } } void setCheckpointInterval(long checkpointInterval) { this.checkpointInterval = checkpointInterval; @@ -997,7 +1007,7 @@ class Log { lock.channel().close(); lock = null; } - static class BackgroundWorker extends Thread { + static class BackgroundWorker implements Runnable { private static final Logger LOG = LoggerFactory .getLogger(BackgroundWorker.class); private final Log log; @@ -1007,38 +1017,16 @@ class Log { this.log = log; } - void shutdown() { - if(run) { - run = false; - } - } - @Override public void run() { - long lastCheckTime = 0L; - while (run) { - try { - try { - Thread.sleep(Math.max(1000L, log.checkpointInterval / 10L)); - } catch (InterruptedException e) { - // recheck run flag - continue; - } - if(log.open) { - // check to see if we should do a checkpoint - long currentTime = System.currentTimeMillis(); - long elapsed = currentTime - lastCheckTime; - if (elapsed > log.checkpointInterval) { - if(log.writeCheckpoint()) { - lastCheckTime = currentTime; - } - } - } - } catch (IOException e) { - LOG.error("Error doing checkpoint", e); - } catch (Exception e) { - LOG.error("General error in checkpoint worker", e); + try { + if (log.open) { + log.writeCheckpoint(); } + } catch (IOException e) { + LOG.error("Error doing checkpoint", e); + } catch (Exception e) { + LOG.error("General error in checkpoint worker", e); } } }
