Updated Branches: refs/heads/flume-1.4 4d9ef89ca -> cd3cf7ef9
FLUME-1824: Inflights can complete successfully even if checkpoint fails (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/cd3cf7ef Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/cd3cf7ef Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/cd3cf7ef Branch: refs/heads/flume-1.4 Commit: cd3cf7ef9f0af50e4f0c2a544b1258248290b24c Parents: 4d9ef89 Author: Brock Noland <[email protected]> Authored: Tue Jan 8 11:01:31 2013 -0600 Committer: Brock Noland <[email protected]> Committed: Tue Jan 8 11:01:42 2013 -0600 ---------------------------------------------------------------------- .../flume/channel/file/EventQueueBackingStore.java | 1 + .../channel/file/EventQueueBackingStoreFile.java | 7 +++++-- .../apache/flume/channel/file/FlumeEventQueue.java | 1 + 3 files changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/cd3cf7ef/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java index 13b50da..b136eb0 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java @@ -36,6 +36,7 @@ abstract class EventQueueBackingStore { } + abstract void beginCheckpoint() throws IOException; abstract void checkpoint() throws IOException; abstract void incrementFileID(int fileID); abstract void decrementFileID(int fileID); http://git-wip-us.apache.org/repos/asf/flume/blob/cd3cf7ef/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java index 186b15a..7f35301 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java @@ -104,14 +104,17 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { protected abstract void writeCheckpointMetaData() throws IOException; @Override - void checkpoint() throws IOException { - + void beginCheckpoint() throws IOException { LOG.info("Start checkpoint for " + checkpointFile + ", elements to sync = " + overwriteMap.size()); // Start checkpoint elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE); mappedBuffer.force(); + } + + @Override + void checkpoint() throws IOException { setLogWriteOrderID(WriteOrderOracle.next()); LOG.info("Updating checkpoint metadata: logWriteOrderID: " http://git-wip-us.apache.org/repos/asf/flume/blob/cd3cf7ef/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java index 74a2bc8..0f9456b 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java @@ -101,6 +101,7 @@ final class FlumeEventQueue { LOG.debug("Checkpoint not required"); return false; } + backingStore.beginCheckpoint(); inflightPuts.serializeAndWrite(); inflightTakes.serializeAndWrite(); backingStore.checkpoint();
