FLUME-1417: File Channel checkpoint can be bad leading to the channel being unable to start
(Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/444b75af Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/444b75af Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/444b75af Branch: refs/heads/cdh-1.2.0+24_intuit Commit: 444b75af93d37e4b4a6a082e4e6e6009d4f57b48 Parents: 62ee05b Author: Brock Noland <[email protected]> Authored: Sun Aug 5 19:11:59 2012 -0500 Committer: Hari Shreedharan <[email protected]> Committed: Fri Sep 7 13:09:49 2012 -0700 ---------------------------------------------------------------------- .../apache/flume/channel/file/FlumeEventQueue.java | 13 +++-- .../java/org/apache/flume/channel/file/Log.java | 37 ++++++++++----- 2 files changed, 33 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/444b75af/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java index 64d3dec..e692934 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java @@ -26,11 +26,9 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel.MapMode; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; @@ -38,6 +36,8 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import java.util.SortedSet; +import java.util.TreeSet; /** * Queue of events in the channel. This queue stores only @@ -296,12 +296,15 @@ class FlumeEventQueue { return false; } /** - * @return the set of fileIDs which are currently on the queue + * @return a copy of the set of fileIDs which are currently on the queue * will be normally be used when deciding which data files can * be deleted */ - synchronized Set<Integer> getFileIDs() { - return new HashSet<Integer>(fileIDCounts.keySet()); + synchronized SortedSet<Integer> getFileIDs() { + //Java implements clone pretty well. The main place this is used + //in checkpointing and deleting old files, so best + //to use a sorted set implementation. + return new TreeSet<Integer>(fileIDCounts.keySet()); } protected void incrementFileID(int fileID) { http://git-wip-us.apache.org/repos/asf/flume/blob/444b75af/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 64a70c8..778db64 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -31,7 +31,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -48,6 +47,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import java.util.SortedSet; /** * Stores FlumeEvents on disk and pointers to the events in a in memory queue. @@ -706,6 +706,7 @@ class Log { private boolean writeCheckpoint(boolean force) throws IOException { boolean lockAcquired = false; + boolean checkpointCompleted = false; try { lockAcquired = checkpointWriterLock.tryLock(this.checkpointWriteTimeout, TimeUnit.SECONDS); @@ -716,11 +717,19 @@ class Log { if(!lockAcquired) { return false; } + SortedSet<Integer> idSet = null; try { if (queue.checkpoint(force) || force) { long ts = queue.getTimestamp(); - Set<Integer> idSet = queue.getFileIDs(); + //Since the active files might also be in the queue's fileIDs, + //we need to either move each one to a new set or remove each one + //as we do here. Otherwise we cannot make sure every element in + //fileID set from the queue have been updated. + //Since clone is smarter than insert, better to make + //a copy of the set first so that we can use it later. + idSet = queue.getFileIDs(); + SortedSet<Integer> idSetToCompare = new TreeSet(idSet); int numFiles = logFiles.length(); for (int i = 0; i < numFiles; i++) { @@ -749,25 +758,32 @@ class Log { idIterator.remove(); } Preconditions.checkState(idSet.size() == 0, - "Could not update all data file timestamps: " + idSet); + "Could not update all data file timestamps: " + idSet); + //Add files from all log directories + for (int index = 0; index < logDirs.length; index++) { + idSetToCompare.add(logFiles.get(index).getFileID()); + } + idSet = idSetToCompare; + checkpointCompleted = true; } } finally { checkpointWriterLock.unlock(); } + //Do the deletes outside the checkpointWriterLock + //Delete logic is expensive. + if (open && checkpointCompleted) { + removeOldLogs(idSet); + } //Since the exception is not caught, this will not be returned if //an exception is thrown from the try. return true; } - private void removeOldLogs() { + private void removeOldLogs(SortedSet<Integer> fileIDs) { Preconditions.checkState(open, "Log is closed"); // we will find the smallest fileID currently in use and // won't delete any files with an id larger than the min - Set<Integer> fileIDs = new TreeSet<Integer>(queue.getFileIDs()); - for (int index = 0; index < logDirs.length; index++) { - fileIDs.add(logFiles.get(index).getFileID()); - } - int minFileID = Collections.min(fileIDs); + int minFileID = fileIDs.first(); LOGGER.debug("Files currently in use: " + fileIDs); for(File logDir : logDirs) { List<File> logs = LogUtils.getLogs(logDir); @@ -895,9 +911,6 @@ class Log { } } } - if(log.open) { - log.removeOldLogs(); - } } catch (IOException e) { LOG.error("Error doing checkpoint", e); } catch (Exception e) {
