FLUME-1794. FileChannel check for full disks in the background (Brock Noland via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7d9ec237 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7d9ec237 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7d9ec237 Branch: refs/heads/flume-1.3.0 Commit: 7d9ec2379dff8bc417619947dd979b084ff81a5b Parents: 4ad7bc5 Author: Hari Shreedharan <[email protected]> Authored: Tue Dec 18 16:30:56 2012 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Thu Dec 20 14:49:01 2012 -0800 ---------------------------------------------------------------------- .../java/org/apache/flume/channel/file/Log.java | 21 ++++++-- .../org/apache/flume/channel/file/LogFile.java | 38 ++++++++++++++- .../apache/flume/channel/file/LogFileFactory.java | 5 +- .../org/apache/flume/channel/file/LogFileV2.java | 5 +- .../org/apache/flume/channel/file/LogFileV3.java | 6 ++- .../flume/channel/file/TestFileChannelRestart.java | 1 + .../org/apache/flume/channel/file/TestLog.java | 30 ++++++++++- .../org/apache/flume/channel/file/TestLogFile.java | 6 +- .../channel/file/TestTransactionEventRecordV3.java | 2 - 9 files changed, 93 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/7d9ec237/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index af11dc5..8a4201c 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -112,6 +112,7 @@ class Log { private String encryptionCipherProvider; private String encryptionKeyAlias; private Key encryptionKey; + private final long usableSpaceRefreshInterval; static class Builder { private long bCheckpointInterval; @@ -130,6 +131,12 @@ class Log { private KeyProvider bEncryptionKeyProvider; private String bEncryptionKeyAlias; private String bEncryptionCipherProvider; + private long bUsableSpaceRefreshInterval = 15L * 1000L; + + Builder setUsableSpaceRefreshInterval(long usableSpaceRefreshInterval) { + bUsableSpaceRefreshInterval = usableSpaceRefreshInterval; + return this; + } Builder setCheckpointInterval(long interval) { bCheckpointInterval = interval; @@ -206,7 +213,8 @@ class Log { bLogWriteTimeout, bCheckpointWriteTimeout, bCheckpointDir, bName, useLogReplayV1, useFastReplay, bMinimumRequiredSpace, bEncryptionKeyProvider, bEncryptionKeyAlias, - bEncryptionCipherProvider, bLogDirs); + bEncryptionCipherProvider, bUsableSpaceRefreshInterval, + bLogDirs); } } @@ -215,13 +223,16 @@ class Log { String name, boolean useLogReplayV1, boolean useFastReplay, long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider, @Nullable String encryptionKeyAlias, - @Nullable String encryptionCipherProvider, File... logDirs) + @Nullable String encryptionCipherProvider, + long usableSpaceRefreshInterval, File... logDirs) throws IOException { Preconditions.checkArgument(checkpointInterval > 0, "checkpointInterval <= 0"); Preconditions.checkArgument(queueCapacity > 0, "queueCapacity <= 0"); Preconditions.checkArgument(maxFileSize > 0, "maxFileSize <= 0"); Preconditions.checkNotNull(checkpointDir, "checkpointDir"); + Preconditions.checkArgument(usableSpaceRefreshInterval > 0, + "usableSpaceRefreshInterval <= 0"); Preconditions.checkArgument( checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir " + checkpointDir + " could not be created"); @@ -234,6 +245,7 @@ class Log { this.useLogReplayV1 = useLogReplayV1; this.useFastReplay = useFastReplay; this.minimumRequiredSpace = minimumRequiredSpace; + this.usableSpaceRefreshInterval = usableSpaceRefreshInterval; for (File logDir : logDirs) { Preconditions.checkArgument(logDir.isDirectory() || logDir.mkdirs(), "LogDir " + logDir + " could not be created"); @@ -292,7 +304,6 @@ class Log { * directly before the shutdown or crash. * @throws IOException */ - @SuppressWarnings("deprecation") void replay() throws IOException { Preconditions.checkState(!open, "Cannot replay after Log has been opened"); @@ -406,6 +417,7 @@ class Log { } } + @SuppressWarnings("deprecation") private void doReplay(FlumeEventQueue queue, List<File> dataFiles, KeyProvider encryptionKeyProvider) throws Exception { CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles, @@ -801,7 +813,7 @@ class Log { File file = new File(logDirs[index], PREFIX + fileID); LogFile.Writer writer = LogFileFactory.getWriter(file, fileID, maxFileSize, encryptionKey, encryptionKeyAlias, - encryptionCipherProvider); + encryptionCipherProvider, usableSpaceRefreshInterval); idLogFileMap.put(fileID, LogFileFactory.getRandomReader(file, encryptionKeyProvider)); // writer from this point on will get new reference @@ -1021,7 +1033,6 @@ class Log { private static final Logger LOG = LoggerFactory .getLogger(BackgroundWorker.class); private final Log log; - private volatile boolean run = true; public BackgroundWorker(Log log) { this.log = log; http://git-wip-us.apache.org/repos/asf/flume/blob/7d9ec237/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java index 8089ff3..1db3717 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java @@ -27,6 +27,7 @@ import java.nio.channels.FileChannel; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; @@ -36,6 +37,7 @@ import org.apache.flume.tools.DirectMemoryUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -116,6 +118,34 @@ abstract class LogFile { } } + @VisibleForTesting + static class CachedFSUsableSpace { + private final File fs; + private final long interval; + private final AtomicLong lastRefresh; + private final AtomicLong value; + + CachedFSUsableSpace(File fs, long interval) { + this.fs = fs; + this.interval = interval; + this.value = new AtomicLong(fs.getUsableSpace()); + this.lastRefresh = new AtomicLong(System.currentTimeMillis()); + } + + void decrement(long numBytes) { + Preconditions.checkArgument(numBytes >= 0, "numBytes less than zero"); + value.addAndGet(-numBytes); + } + long getUsableSpace() { + long now = System.currentTimeMillis(); + if(now - interval > lastRefresh.get()) { + value.set(fs.getUsableSpace()); + lastRefresh.set(now); + } + return Math.max(value.get(), 0L); + } + } + static abstract class Writer { private final int logFileID; private final File file; @@ -123,10 +153,12 @@ abstract class LogFile { private final RandomAccessFile writeFileHandle; private final FileChannel writeFileChannel; private final CipherProvider.Encryptor encryptor; + private final CachedFSUsableSpace usableSpace; private volatile boolean open; + Writer(File file, int logFileID, long maxFileSize, - CipherProvider.Encryptor encryptor) + CipherProvider.Encryptor encryptor, long usableSpaceRefreshInterval) throws IOException { this.file = file; this.logFileID = logFileID; @@ -135,6 +167,7 @@ abstract class LogFile { this.encryptor = encryptor; writeFileHandle = new RandomAccessFile(file, "rw"); writeFileChannel = writeFileHandle.getChannel(); + usableSpace = new CachedFSUsableSpace(file, usableSpaceRefreshInterval); LOG.info("Opened " + file); open = true; } @@ -156,7 +189,7 @@ abstract class LogFile { } long getUsableSpace() { - return file.getUsableSpace(); + return usableSpace.getUsableSpace(); } long getMaxSize() { @@ -205,6 +238,7 @@ abstract class LogFile { Preconditions.checkState(offset >= 0, String.valueOf(offset)); // OP_RECORD + size + buffer int recordLength = 1 + (int)Serialization.SIZE_OF_INT + buffer.limit(); + usableSpace.decrement(recordLength); preallocate(recordLength); ByteBuffer toWrite = ByteBuffer.allocate(recordLength); toWrite.put(OP_RECORD); http://git-wip-us.apache.org/repos/asf/flume/blob/7d9ec237/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java index 1fe219a..9c98d8c 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java @@ -67,13 +67,14 @@ class LogFileFactory { static LogFile.Writer getWriter(File file, int logFileID, long maxFileSize, @Nullable Key encryptionKey, @Nullable String encryptionKeyAlias, - @Nullable String encryptionCipherProvider) throws IOException { + @Nullable String encryptionCipherProvider, + long usableSpaceRefreshInterval) throws IOException { Preconditions.checkState(!file.exists(), "File already exists " + file.getAbsolutePath()); Preconditions.checkState(file.createNewFile(), "File could not be created " + file.getAbsolutePath()); return new LogFileV3.Writer(file, logFileID, maxFileSize, encryptionKey, - encryptionKeyAlias, encryptionCipherProvider); + encryptionKeyAlias, encryptionCipherProvider, usableSpaceRefreshInterval); } static LogFile.RandomReader getRandomReader(File file, http://git-wip-us.apache.org/repos/asf/flume/blob/7d9ec237/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java index 4c593a4..f286c57 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java @@ -93,9 +93,10 @@ class LogFileV2 extends LogFile { static class Writer extends LogFile.Writer { - Writer(File file, int logFileID, long maxFileSize) + Writer(File file, int logFileID, long maxFileSize, + long usableSpaceRefreshInterval) throws IOException { - super(file, logFileID, maxFileSize, null); + super(file, logFileID, maxFileSize, null, usableSpaceRefreshInterval); RandomAccessFile writeFileHandle = getFileHandle(); writeFileHandle.writeInt(getVersion()); writeFileHandle.writeInt(logFileID); http://git-wip-us.apache.org/repos/asf/flume/blob/7d9ec237/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java index aac7805..f51935c 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java @@ -169,10 +169,12 @@ class LogFileV3 extends LogFile { Writer(File file, int logFileID, long maxFileSize, @Nullable Key encryptionKey, @Nullable String encryptionKeyAlias, - @Nullable String encryptionCipherProvider) + @Nullable String encryptionCipherProvider, + long usableSpaceRefreshInterval) throws IOException { super(file, logFileID, maxFileSize, CipherProviderFactory. - getEncrypter(encryptionCipherProvider, encryptionKey)); + getEncrypter(encryptionCipherProvider, encryptionKey), + usableSpaceRefreshInterval); ProtosFactory.LogFileMetaData.Builder metaDataBuilder = ProtosFactory.LogFileMetaData.newBuilder(); if(encryptionKey != null) { http://git-wip-us.apache.org/repos/asf/flume/blob/7d9ec237/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java index 3d5bf59..ea57cdb 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java @@ -311,6 +311,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { File inflight = new File(checkpointDir, name); RandomAccessFile writer = new RandomAccessFile(inflight, "rw"); writer.write(new Random().nextInt()); + writer.close(); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); http://git-wip-us.apache.org/repos/asf/flume/blob/7d9ec237/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java index f9dbba5..6751714 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java @@ -18,6 +18,7 @@ */ package org.apache.flume.channel.file; +import static org.mockito.Mockito.*; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -184,7 +185,7 @@ public class TestLog { } public void doTestMinimumRequiredSpaceTooSmallForPut() throws IOException, InterruptedException { - long minimumRequireSpace = checkpointDir.getUsableSpace() - + long minimumRequiredSpace = checkpointDir.getUsableSpace() - (10L* 1024L * 1024L); log.close(); log = new Log.Builder().setCheckpointInterval( @@ -192,12 +193,13 @@ public class TestLog { FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize( CAPACITY).setCheckpointDir(checkpointDir).setLogDirs( dataDirs).setChannelName("testlog"). - setMinimumRequiredSpace(minimumRequireSpace).build(); + setMinimumRequiredSpace(minimumRequiredSpace) + .setUsableSpaceRefreshInterval(1L).build(); log.replay(); File filler = new File(checkpointDir, "filler"); byte[] buffer = new byte[64 * 1024]; FileOutputStream out = new FileOutputStream(filler); - while(checkpointDir.getUsableSpace() > minimumRequireSpace) { + while(checkpointDir.getUsableSpace() > minimumRequiredSpace) { out.write(buffer); } out.close(); @@ -436,6 +438,28 @@ public class TestLog { Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders()); Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody()); } + @Test + public void testCachedFSUsableSpace() throws Exception { + File fs = mock(File.class); + when(fs.getUsableSpace()).thenReturn(Long.MAX_VALUE); + LogFile.CachedFSUsableSpace cachedFS = + new LogFile.CachedFSUsableSpace(fs, 1000L); + Assert.assertEquals(cachedFS.getUsableSpace(), Long.MAX_VALUE); + cachedFS.decrement(Integer.MAX_VALUE); + Assert.assertEquals(cachedFS.getUsableSpace(), + Long.MAX_VALUE - Integer.MAX_VALUE); + try { + cachedFS.decrement(-1); + Assert.fail(); + } catch (IllegalArgumentException expected) { + + } + when(fs.getUsableSpace()).thenReturn(Long.MAX_VALUE - 1L); + Thread.sleep(1100); + Assert.assertEquals(cachedFS.getUsableSpace(), + Long.MAX_VALUE - 1L); + } + private void takeAndVerify(FlumeEventPointer eventPointerIn, FlumeEvent eventIn) throws IOException, InterruptedException { FlumeEventQueue queue = log.getFlumeEventQueue(); http://git-wip-us.apache.org/repos/asf/flume/blob/7d9ec237/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java index 9e28599..bef22ef 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java @@ -55,7 +55,7 @@ public class TestLogFile { dataFile = new File(dataDir, String.valueOf(fileID)); Assert.assertTrue(dataDir.isDirectory()); logFileWriter = LogFileFactory.getWriter(dataFile, fileID, - Integer.MAX_VALUE, null, null, null); + Integer.MAX_VALUE, null, null, null, Long.MAX_VALUE); } @After public void cleanup() throws IOException { @@ -72,7 +72,7 @@ public class TestLogFile { Assert.assertTrue(dataFile.isFile() || dataFile.createNewFile()); try { LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null, - null); + null, Long.MAX_VALUE); Assert.fail(); } catch (IllegalStateException e) { Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(), @@ -86,7 +86,7 @@ public class TestLogFile { Assert.assertTrue(dataFile.mkdirs()); try { LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null, - null); + null, Long.MAX_VALUE); Assert.fail(); } catch (IllegalStateException e) { Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(), http://git-wip-us.apache.org/repos/asf/flume/blob/7d9ec237/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java index 274ee7b..f403422 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java @@ -20,9 +20,7 @@ package org.apache.flume.channel.file; import static org.mockito.Mockito.*; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap;
