Updated Branches: refs/heads/trunk 21c67ed59 -> 1a2e0d7a7
FLUME-1824: Inflights can complete successfully even if checkpoint fails (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1a2e0d7a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1a2e0d7a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1a2e0d7a Branch: refs/heads/trunk Commit: 1a2e0d7a7629eb32821c780d48a2c17f4e76a59e Parents: 21c67ed Author: Brock Noland <[email protected]> Authored: Tue Jan 8 11:01:31 2013 -0600 Committer: Brock Noland <[email protected]> Committed: Tue Jan 8 11:01:31 2013 -0600 ---------------------------------------------------------------------- .../flume/channel/file/EventQueueBackingStore.java | 1 + .../channel/file/EventQueueBackingStoreFile.java | 7 +++++-- .../apache/flume/channel/file/FlumeEventQueue.java | 1 + 3 files changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/1a2e0d7a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java index 13b50da..b136eb0 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java @@ -36,6 +36,7 @@ abstract class EventQueueBackingStore { } + abstract void beginCheckpoint() throws IOException; abstract void checkpoint() throws IOException; abstract void incrementFileID(int fileID); abstract void decrementFileID(int fileID); http://git-wip-us.apache.org/repos/asf/flume/blob/1a2e0d7a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java index 186b15a..7f35301 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java @@ -104,14 +104,17 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { protected abstract void writeCheckpointMetaData() throws IOException; @Override - void checkpoint() throws IOException { - + void beginCheckpoint() throws IOException { LOG.info("Start checkpoint for " + checkpointFile + ", elements to sync = " + overwriteMap.size()); // Start checkpoint elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE); mappedBuffer.force(); + } + + @Override + void checkpoint() throws IOException { setLogWriteOrderID(WriteOrderOracle.next()); LOG.info("Updating checkpoint metadata: logWriteOrderID: " http://git-wip-us.apache.org/repos/asf/flume/blob/1a2e0d7a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java index 74a2bc8..0f9456b 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java @@ -101,6 +101,7 @@ final class FlumeEventQueue { LOG.debug("Checkpoint not required"); return false; } + backingStore.beginCheckpoint(); inflightPuts.serializeAndWrite(); inflightTakes.serializeAndWrite(); backingStore.checkpoint();
