FLUME-1763. FileChannel checkpoints should not be done without free space (Brock Noland via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9c5a9a8d Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9c5a9a8d Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9c5a9a8d Branch: refs/heads/flume-1.3.0 Commit: 9c5a9a8d8f4bac82cec20505f16331b497f480c0 Parents: 02cc9d4 Author: Hari Shreedharan <[email protected]> Authored: Mon Dec 10 17:13:51 2012 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Thu Dec 20 00:13:21 2012 -0800 ---------------------------------------------------------------------- .../org/apache/flume/channel/file/FileChannel.java | 7 ++ .../channel/file/FileChannelConfiguration.java | 11 +++ .../java/org/apache/flume/channel/file/Log.java | 32 +++++--- .../org/apache/flume/channel/file/TestLog.java | 68 ++++++++++++++- flume-ng-doc/sphinx/FlumeUserGuide.rst | 1 + 5 files changed, 107 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/9c5a9a8d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index d12ad9e..4bf480b 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -77,6 +77,7 @@ public class FileChannel extends BasicChannelSemantics { private int transactionCapacity; private long checkpointInterval; private long maxFileSize; + private long minimumRequiredSpace; private File checkpointDir; private File[] dataDirs; private Log log; @@ -172,6 +173,11 @@ public class FileChannel extends BasicChannelSemantics { FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE), FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE); + minimumRequiredSpace = Math.max( + context.getLong(FileChannelConfiguration.MINIMUM_REQUIRED_SPACE, + FileChannelConfiguration.DEFAULT_MINIMUM_REQUIRED_SPACE), + FileChannelConfiguration.FLOOR_MINIMUM_REQUIRED_SPACE); + logWriteTimeout = context.getInteger( FileChannelConfiguration.LOG_WRITE_TIMEOUT, FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT); @@ -256,6 +262,7 @@ public class FileChannel extends BasicChannelSemantics { Builder builder = new Log.Builder(); builder.setCheckpointInterval(checkpointInterval); builder.setMaxFileSize(maxFileSize); + builder.setMinimumRequiredSpace(minimumRequiredSpace); builder.setQueueSize(capacity); builder.setLogWriteTimeout(logWriteTimeout); builder.setCheckpointDir(checkpointDir); http://git-wip-us.apache.org/repos/asf/flume/blob/9c5a9a8d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java index 92cad77..24368b3 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java @@ -45,6 +45,17 @@ public class FileChannelConfiguration { public static final String MAX_FILE_SIZE = "maxFileSize"; public static final long DEFAULT_MAX_FILE_SIZE = Integer.MAX_VALUE - (500L * 1024L * 1024L); // ~1.52 G + + public static final String MINIMUM_REQUIRED_SPACE = "minimumRequiredSpace"; + /** + * Minimum space required defaults to 500MB + */ + public static final long DEFAULT_MINIMUM_REQUIRED_SPACE = 500L * 1024L * 1024L; + /** + * Minimum space floor is 1MB + */ + public static final long FLOOR_MINIMUM_REQUIRED_SPACE = 1L * 1024L * 1024L; + /** * Maximum capacity of the channel. * Default: 1,000,000 http://git-wip-us.apache.org/repos/asf/flume/blob/9c5a9a8d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 7906d30..829e35a 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -76,10 +76,6 @@ class Log { private static final Logger LOGGER = LoggerFactory.getLogger(Log.class); private static final int MIN_NUM_LOGS = 2; private static final String FILE_LOCK = "in_use.lock"; - /** - * Each file system in use must have at least 10MB of space. - */ - private static final long ABSOLUTE_MINIMUM_REQURED_SPACE = 10L * 1024L * 1024L; // for reader private final Map<Integer, LogFile.RandomReader> idLogFileMap = Collections .synchronizedMap(new HashMap<Integer, LogFile.RandomReader>()); @@ -96,6 +92,7 @@ class Log { private long checkpointInterval; private long maxFileSize; private final boolean useFastReplay; + private final long minimumRequiredSpace; private final Map<String, FileLock> locks; private final ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(true); @@ -118,6 +115,7 @@ class Log { static class Builder { private long bCheckpointInterval; + private long bMinimumRequiredSpace; private long bMaxFileSize; private int bQueueCapacity; private File bCheckpointDir; @@ -168,6 +166,11 @@ class Log { return this; } + Builder setMinimumRequiredSpace(long minimumRequiredSpace) { + bMinimumRequiredSpace = minimumRequiredSpace; + return this; + } + Builder setCheckpointWriteTimeout(int checkpointTimeout){ bCheckpointWriteTimeout = checkpointTimeout; return this; @@ -201,15 +204,16 @@ class Log { Log build() throws IOException { return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity, bLogWriteTimeout, bCheckpointWriteTimeout, bCheckpointDir, bName, - useLogReplayV1, useFastReplay, bEncryptionKeyProvider, - bEncryptionKeyAlias, bEncryptionCipherProvider, bLogDirs); + useLogReplayV1, useFastReplay, bMinimumRequiredSpace, + bEncryptionKeyProvider, bEncryptionKeyAlias, + bEncryptionCipherProvider, bLogDirs); } } private Log(long checkpointInterval, long maxFileSize, int queueCapacity, int logWriteTimeout, int checkpointWriteTimeout, File checkpointDir, String name, boolean useLogReplayV1, boolean useFastReplay, - @Nullable KeyProvider encryptionKeyProvider, + long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider, @Nullable String encryptionKeyAlias, @Nullable String encryptionCipherProvider, File... logDirs) throws IOException { @@ -229,6 +233,7 @@ class Log { this.channelNameDescriptor = "[channel=" + name + "]"; this.useLogReplayV1 = useLogReplayV1; this.useFastReplay = useFastReplay; + this.minimumRequiredSpace = minimumRequiredSpace; for (File logDir : logDirs) { Preconditions.checkArgument(logDir.isDirectory() || logDir.mkdirs(), "LogDir " + logDir + " could not be created"); @@ -467,7 +472,7 @@ class Log { ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put); int logFileIndex = nextLogWriter(transactionID); long usableSpace = logFiles.get(logFileIndex).getUsableSpace(); - long requiredSpace = ABSOLUTE_MINIMUM_REQURED_SPACE + buffer.limit(); + long requiredSpace = minimumRequiredSpace + buffer.limit(); if(usableSpace <= requiredSpace) { throw new IOException("Usable space exhaused, only " + usableSpace + " bytes remaining, required " + requiredSpace + " bytes"); @@ -510,7 +515,7 @@ class Log { ByteBuffer buffer = TransactionEventRecord.toByteBuffer(take); int logFileIndex = nextLogWriter(transactionID); long usableSpace = logFiles.get(logFileIndex).getUsableSpace(); - long requiredSpace = ABSOLUTE_MINIMUM_REQURED_SPACE + buffer.limit(); + long requiredSpace = minimumRequiredSpace + buffer.limit(); if(usableSpace <= requiredSpace) { throw new IOException("Usable space exhaused, only " + usableSpace + " bytes remaining, required " + requiredSpace + " bytes"); @@ -552,7 +557,7 @@ class Log { ByteBuffer buffer = TransactionEventRecord.toByteBuffer(rollback); int logFileIndex = nextLogWriter(transactionID); long usableSpace = logFiles.get(logFileIndex).getUsableSpace(); - long requiredSpace = ABSOLUTE_MINIMUM_REQURED_SPACE + buffer.limit(); + long requiredSpace = minimumRequiredSpace + buffer.limit(); if(usableSpace <= requiredSpace) { throw new IOException("Usable space exhaused, only " + usableSpace + " bytes remaining, required " + requiredSpace + " bytes"); @@ -718,7 +723,7 @@ class Log { ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit); int logFileIndex = nextLogWriter(transactionID); long usableSpace = logFiles.get(logFileIndex).getUsableSpace(); - long requiredSpace = ABSOLUTE_MINIMUM_REQURED_SPACE + buffer.limit(); + long requiredSpace = minimumRequiredSpace + buffer.limit(); if(usableSpace <= requiredSpace) { throw new IOException("Usable space exhaused, only " + usableSpace + " bytes remaining, required " + requiredSpace + " bytes"); @@ -830,6 +835,11 @@ class Log { */ private Boolean writeCheckpoint(Boolean force) throws Exception { boolean checkpointCompleted = false; + long usableSpace = checkpointDir.getUsableSpace(); + if(usableSpace <= minimumRequiredSpace) { + throw new IOException("Usable space exhaused, only " + usableSpace + + " bytes remaining, required " + minimumRequiredSpace + " bytes"); + } boolean lockAcquired = tryLockExclusive(); if(!lockAcquired) { return false; http://git-wip-us.apache.org/repos/asf/flume/blob/9c5a9a8d/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java index a165d6a..bc7b3cf 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java @@ -19,6 +19,7 @@ package org.apache.flume.channel.file; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.util.List; @@ -27,11 +28,14 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.io.Files; public class TestLog { + private static final Logger LOGGER = LoggerFactory.getLogger(TestLog.class); private static final long MAX_FILE_SIZE = 1000; private static final int CAPACITY = 10000; private Log log; @@ -144,7 +148,69 @@ public class TestLog { FlumeEventQueue queue = log.getFlumeEventQueue(); Assert.assertNull(queue.removeHead(transactionID)); } - + @Test + public void testMinimumRequiredSpaceTooSmallOnStartup() throws IOException, + InterruptedException { + log.close(); + log = new Log.Builder().setCheckpointInterval( + Long.MAX_VALUE).setMaxFileSize( + FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize( + CAPACITY).setCheckpointDir(checkpointDir).setLogDirs( + dataDirs).setChannelName("testlog"). + setMinimumRequiredSpace(Long.MAX_VALUE).build(); + try { + log.replay(); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage(), e.getMessage() + .startsWith("Usable space exhaused")); + } + } + /** + * There is a race here in that someone could take up some space + */ + @Test + public void testMinimumRequiredSpaceTooSmallForPut() throws IOException, + InterruptedException { + try { + doTestMinimumRequiredSpaceTooSmallForPut(); + } catch (IOException e) { + LOGGER.info("Error during test, retrying", e); + doTestMinimumRequiredSpaceTooSmallForPut(); + } catch (AssertionError e) { + LOGGER.info("Test failed, let's be sure it failed for good reason", e); + doTestMinimumRequiredSpaceTooSmallForPut(); + } + } + public void doTestMinimumRequiredSpaceTooSmallForPut() throws IOException, + InterruptedException { + long minimumRequireSpace = checkpointDir.getUsableSpace() - + (10L* 1024L * 1024L); + log.close(); + log = new Log.Builder().setCheckpointInterval( + Long.MAX_VALUE).setMaxFileSize( + FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize( + CAPACITY).setCheckpointDir(checkpointDir).setLogDirs( + dataDirs).setChannelName("testlog"). + setMinimumRequiredSpace(minimumRequireSpace).build(); + log.replay(); + File filler = new File(checkpointDir, "filler"); + byte[] buffer = new byte[64 * 1024]; + FileOutputStream out = new FileOutputStream(filler); + while(checkpointDir.getUsableSpace() > minimumRequireSpace) { + out.write(buffer); + } + out.close(); + try { + FlumeEvent eventIn = TestUtils.newPersistableEvent(); + long transactionID = ++this.transactionID; + log.put(transactionID, eventIn); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue(e.getMessage(), e.getMessage() + .startsWith("Usable space exhaused")); + } + } /** * After replay of the log, we should not find the event because the take * was committed http://git-wip-us.apache.org/repos/asf/flume/blob/9c5a9a8d/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 2388087..54caf33 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1665,6 +1665,7 @@ dataDirs ~/.flume/file-channel/data transactionCapacity 1000 The maximum size of transaction supported by the channel checkpointInterval 30000 Amount of time (in millis) between checkpoints maxFileSize 2146435071 Max size (in bytes) of a single log file +minimumRequiredSpace 524288000 Minimum Required free space (in bytes) capacity 1000000 Maximum capacity of the channel keep-alive 3 Amount of time (in sec) to wait for a put operation write-timeout 3 Amount of time (in sec) to wait for a write operation
