Updated Branches: refs/heads/trunk 781ec3fcd -> dad3d735e
FLUME-1609. FileChannel detecting when the underlying file systems are full could provide cleaner error recovery. (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dad3d735 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dad3d735 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dad3d735 Branch: refs/heads/trunk Commit: dad3d735eb91cf89988dd6a6772fbecaa839315b Parents: 781ec3f Author: Hari Shreedharan <[email protected]> Authored: Tue Oct 16 14:36:44 2012 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Tue Oct 16 14:36:44 2012 -0700 ---------------------------------------------------------------------- .../java/org/apache/flume/channel/file/Log.java | 28 +++++++++++++++ .../org/apache/flume/channel/file/LogFile.java | 5 +++ 2 files changed, 33 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/dad3d735/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 64725dd..d68a601 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -69,6 +69,10 @@ class Log { private static final Logger LOGGER = LoggerFactory.getLogger(Log.class); private static final int MIN_NUM_LOGS = 2; private static final String FILE_LOCK = "in_use.lock"; + /** + * Each file system in use must have at least 10MB of space. + */ + private static final long ABSOLUTE_MINIMUM_REQURED_SPACE = 10L * 1024L * 1024L; // for reader private final Map<Integer, LogFile.RandomReader> idLogFileMap = Collections .synchronizedMap(new HashMap<Integer, LogFile.RandomReader>()); @@ -426,6 +430,12 @@ class Log { Put put = new Put(transactionID, WriteOrderOracle.next(), flumeEvent); ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put); int logFileIndex = nextLogWriter(transactionID); + long usableSpace = logFiles.get(logFileIndex).getUsableSpace(); + long requiredSpace = ABSOLUTE_MINIMUM_REQURED_SPACE + buffer.limit(); + if(usableSpace <= requiredSpace) { + throw new IOException("Usable space exhaused, only " + usableSpace + + " bytes remaining, required " + requiredSpace + " bytes"); + } boolean error = true; try { try { @@ -463,6 +473,12 @@ class Log { pointer.getOffset(), pointer.getFileID()); ByteBuffer buffer = TransactionEventRecord.toByteBuffer(take); int logFileIndex = nextLogWriter(transactionID); + long usableSpace = logFiles.get(logFileIndex).getUsableSpace(); + long requiredSpace = ABSOLUTE_MINIMUM_REQURED_SPACE + buffer.limit(); + if(usableSpace <= requiredSpace) { + throw new IOException("Usable space exhaused, only " + usableSpace + + " bytes remaining, required " + requiredSpace + " bytes"); + } boolean error = true; try { try { @@ -499,6 +515,12 @@ class Log { Rollback rollback = new Rollback(transactionID, WriteOrderOracle.next()); ByteBuffer buffer = TransactionEventRecord.toByteBuffer(rollback); int logFileIndex = nextLogWriter(transactionID); + long usableSpace = logFiles.get(logFileIndex).getUsableSpace(); + long requiredSpace = ABSOLUTE_MINIMUM_REQURED_SPACE + buffer.limit(); + if(usableSpace <= requiredSpace) { + throw new IOException("Usable space exhaused, only " + usableSpace + + " bytes remaining, required " + requiredSpace + " bytes"); + } boolean error = true; try { try { @@ -655,6 +677,12 @@ class Log { Commit commit = new Commit(transactionID, WriteOrderOracle.next(), type); ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit); int logFileIndex = nextLogWriter(transactionID); + long usableSpace = logFiles.get(logFileIndex).getUsableSpace(); + long requiredSpace = ABSOLUTE_MINIMUM_REQURED_SPACE + buffer.limit(); + if(usableSpace <= requiredSpace) { + throw new IOException("Usable space exhaused, only " + usableSpace + + " bytes remaining, required " + requiredSpace + " bytes"); + } boolean error = true; try { try { http://git-wip-us.apache.org/repos/asf/flume/blob/dad3d735/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java index a2c790c..8089ff3 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java @@ -154,6 +154,11 @@ abstract class LogFile { String getParent() { return file.getParent(); } + + long getUsableSpace() { + return file.getUsableSpace(); + } + long getMaxSize() { return maxFileSize; }
