Updated Branches: refs/heads/trunk ba0b2685b -> b4ddd5829
FLUME-2307. Remove Log writetimeout (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b4ddd582 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b4ddd582 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b4ddd582 Branch: refs/heads/trunk Commit: b4ddd5829897f758f869a5fc3b08dcbf4b55156a Parents: ba0b268 Author: Jarek Jarcec Cecho <[email protected]> Authored: Mon Feb 10 13:23:49 2014 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Mon Feb 10 13:23:49 2014 -0800 ---------------------------------------------------------------------- .../apache/flume/channel/file/FileChannel.java | 82 +++-------------- .../channel/file/FileChannelConfiguration.java | 13 --- .../java/org/apache/flume/channel/file/Log.java | 96 +++++--------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 - 4 files changed, 38 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/b4ddd582/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index 2cd7f03..71b26f7 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -90,8 +90,6 @@ public class FileChannel extends BasicChannelSemantics { private Semaphore queueRemaining; private final ThreadLocal<FileBackedTransaction> transactions = new ThreadLocal<FileBackedTransaction>(); - private int logWriteTimeout; - private int checkpointWriteTimeout; private String channelNameDescriptor = "[channel=unknown]"; private ChannelCounter channelCounter; private boolean useLogReplayV1; @@ -190,39 +188,14 @@ public class FileChannel extends BasicChannelSemantics { // cannot be over FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE maxFileSize = Math.min( - context.getLong(FileChannelConfiguration.MAX_FILE_SIZE, - FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE), - FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE); + context.getLong(FileChannelConfiguration.MAX_FILE_SIZE, + FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE), + FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE); minimumRequiredSpace = Math.max( - context.getLong(FileChannelConfiguration.MINIMUM_REQUIRED_SPACE, - FileChannelConfiguration.DEFAULT_MINIMUM_REQUIRED_SPACE), - FileChannelConfiguration.FLOOR_MINIMUM_REQUIRED_SPACE); - - logWriteTimeout = context.getInteger( - FileChannelConfiguration.LOG_WRITE_TIMEOUT, - FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT); - - if (logWriteTimeout < 0) { - LOG.warn("Log write time out is invalid: " + logWriteTimeout - + ", using default: " - + FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT); - - logWriteTimeout = FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT; - } - - checkpointWriteTimeout = context.getInteger( - FileChannelConfiguration.CHECKPOINT_WRITE_TIMEOUT, - FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT); - - if (checkpointWriteTimeout < 0) { - LOG.warn("Checkpoint write time out is invalid: " + checkpointWriteTimeout - + ", using default: " - + FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT); - - checkpointWriteTimeout = - FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT; - } + context.getLong(FileChannelConfiguration.MINIMUM_REQUIRED_SPACE, + FileChannelConfiguration.DEFAULT_MINIMUM_REQUIRED_SPACE), + FileChannelConfiguration.FLOOR_MINIMUM_REQUIRED_SPACE); useLogReplayV1 = context.getBoolean( FileChannelConfiguration.USE_LOG_REPLAY_V1, @@ -285,11 +258,9 @@ public class FileChannel extends BasicChannelSemantics { builder.setMaxFileSize(maxFileSize); builder.setMinimumRequiredSpace(minimumRequiredSpace); builder.setQueueSize(capacity); - builder.setLogWriteTimeout(logWriteTimeout); builder.setCheckpointDir(checkpointDir); builder.setLogDirs(dataDirs); builder.setChannelName(getName()); - builder.setCheckpointWriteTimeout(checkpointWriteTimeout); builder.setUseLogReplayV1(useLogReplayV1); builder.setUseFastReplay(useFastReplay); builder.setEncryptionKeyProvider(encryptionKeyProvider); @@ -471,13 +442,8 @@ public class FileChannel extends BasicChannelSemantics { + channelNameDescriptor); } boolean success = false; - boolean lockAcquired = log.tryLockShared(); + log.lockShared(); try { - if(!lockAcquired) { - throw new ChannelException("Failed to obtain lock for writing to the " - + "log. Try increasing the log write timeout value. " + - channelNameDescriptor); - } FlumeEventPointer ptr = log.put(transactionID, event); Preconditions.checkState(putList.offer(ptr), "putList offer failed " + channelNameDescriptor); @@ -487,9 +453,7 @@ public class FileChannel extends BasicChannelSemantics { throw new ChannelException("Put failed due to IO error " + channelNameDescriptor, e); } finally { - if(lockAcquired) { - log.unlockShared(); - } + log.unlockShared(); if(!success) { // release slot obtained in the case // the put fails for any reason @@ -507,12 +471,7 @@ public class FileChannel extends BasicChannelSemantics { "increasing capacity, or increasing thread count. " + channelNameDescriptor); } - if(!log.tryLockShared()) { - throw new ChannelException("Failed to obtain lock for writing to the " - + "log. Try increasing the log write timeout value. " + - channelNameDescriptor); - } - + log.lockShared(); /* * 1. Take an event which is in the queue. * 2. If getting that event does not throw NoopRecordException, @@ -557,11 +516,7 @@ public class FileChannel extends BasicChannelSemantics { if(puts > 0) { Preconditions.checkState(takes == 0, "nonzero puts and takes " + channelNameDescriptor); - if(!log.tryLockShared()) { - throw new ChannelException("Failed to obtain lock for writing to the " - + "log. Try increasing the log write timeout value. " + - channelNameDescriptor); - } + log.lockShared(); try { log.commitPut(transactionID); channelCounter.addToEventPutSuccessCount(puts); @@ -589,11 +544,7 @@ public class FileChannel extends BasicChannelSemantics { } } else if (takes > 0) { - if(!log.tryLockShared()) { - throw new ChannelException("Failed to obtain lock for writing to the " - + "log. Try increasing the log write timeout value. " + - channelNameDescriptor); - } + log.lockShared(); try { log.commitTake(transactionID); queue.completeTransaction(transactionID); @@ -614,13 +565,8 @@ public class FileChannel extends BasicChannelSemantics { protected void doRollback() throws InterruptedException { int puts = putList.size(); int takes = takeList.size(); - boolean lockAcquired = log.tryLockShared(); + log.lockShared(); try { - if(!lockAcquired) { - throw new ChannelException("Failed to obtain lock for writing to the " - + "log. Try increasing the log write timeout value. " + - channelNameDescriptor); - } if(takes > 0) { Preconditions.checkState(puts == 0, "nonzero puts and takes " + channelNameDescriptor); @@ -641,9 +587,7 @@ public class FileChannel extends BasicChannelSemantics { throw new ChannelException("Commit failed due to IO error " + channelNameDescriptor, e); } finally { - if(lockAcquired) { - log.unlockShared(); - } + log.unlockShared(); // since rollback is being called, puts will never make it on // to the queue and we need to be sure to release the resources queueRemaining.release(puts); http://git-wip-us.apache.org/repos/asf/flume/blob/b4ddd582/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java index 10ca11f..e4bc879 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java @@ -76,19 +76,6 @@ public class FileChannelConfiguration { public static final int DEFAULT_KEEP_ALIVE = 3; /** - * The amount of time in seconds a writer will wait before failing when - * checkpoint is enqueued or in progress. - */ - public static final String LOG_WRITE_TIMEOUT = "write-timeout"; - public static final int DEFAULT_WRITE_TIMEOUT = 10; - - /** - * The amount of time in seconds the channel should wait to write the - * checkpoint when some other operation(s) are enqueued or in progress. - */ - public static final String CHECKPOINT_WRITE_TIMEOUT = "checkpoint-timeout"; - public static final int DEFAULT_CHECKPOINT_WRITE_TIMEOUT = 600; - /** * Turn on Flume 1.2 log replay logic */ public static final String USE_LOG_REPLAY_V1 = "use-log-replay-v1"; http://git-wip-us.apache.org/repos/asf/flume/blob/b4ddd582/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 70106cb..579ee35 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -26,7 +26,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.io.FileUtils; -import org.apache.flume.ChannelException; import org.apache.flume.Event; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; @@ -66,8 +65,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; * the on disk write ahead log with the last checkpoint of the queue. * * Before calling any of commitPut/commitTake/get/put/rollback/take - * Log.tryLockShared should be called and the above operations - * should only be called if tryLockShared returns true. After + * {@linkplain org.apache.flume.channel.file.Log#lockShared()} + * should be called. After * the operation and any additional modifications of the * FlumeEventQueue, the Log.unlockShared method should be called. */ @@ -114,9 +113,7 @@ public class Log { * Exclusive lock */ private final WriteLock checkpointWriterLock = checkpointLock.writeLock(); - private int logWriteTimeout; private final String channelNameDescriptor; - private int checkpointWriteTimeout; private boolean useLogReplayV1; private KeyProvider encryptionKeyProvider; private String encryptionCipherProvider; @@ -143,11 +140,7 @@ public class Log { private int bQueueCapacity; private File bCheckpointDir; private File[] bLogDirs; - private int bLogWriteTimeout = - FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT; private String bName; - private int bCheckpointWriteTimeout = - FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT; private boolean useLogReplayV1; private boolean useFastReplay; private KeyProvider bEncryptionKeyProvider; @@ -187,11 +180,6 @@ public class Log { return this; } - Builder setLogWriteTimeout(int timeout) { - bLogWriteTimeout = timeout; - return this; - } - Builder setChannelName(String name) { bName = name; return this; @@ -202,11 +190,6 @@ public class Log { return this; } - Builder setCheckpointWriteTimeout(int checkpointTimeout){ - bCheckpointWriteTimeout = checkpointTimeout; - return this; - } - Builder setUseLogReplayV1(boolean useLogReplayV1){ this.useLogReplayV1 = useLogReplayV1; return this; @@ -244,23 +227,21 @@ public class Log { Log build() throws IOException { return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity, - bLogWriteTimeout, bCheckpointWriteTimeout, bUseDualCheckpoints, - bCheckpointDir, bBackupCheckpointDir, bName, - useLogReplayV1, useFastReplay, bMinimumRequiredSpace, - bEncryptionKeyProvider, bEncryptionKeyAlias, - bEncryptionCipherProvider, bUsableSpaceRefreshInterval, - bLogDirs); + bUseDualCheckpoints, bCheckpointDir, bBackupCheckpointDir, bName, + useLogReplayV1, useFastReplay, bMinimumRequiredSpace, + bEncryptionKeyProvider, bEncryptionKeyAlias, + bEncryptionCipherProvider, bUsableSpaceRefreshInterval, + bLogDirs); } } private Log(long checkpointInterval, long maxFileSize, int queueCapacity, - int logWriteTimeout, int checkpointWriteTimeout, - boolean useDualCheckpoints, File checkpointDir, File backupCheckpointDir, - String name, boolean useLogReplayV1, boolean useFastReplay, - long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider, - @Nullable String encryptionKeyAlias, - @Nullable String encryptionCipherProvider, - long usableSpaceRefreshInterval, File... logDirs) + boolean useDualCheckpoints, File checkpointDir, File backupCheckpointDir, + String name, boolean useLogReplayV1, boolean useFastReplay, + long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider, + @Nullable String encryptionKeyAlias, + @Nullable String encryptionCipherProvider, + long usableSpaceRefreshInterval, File... logDirs) throws IOException { Preconditions.checkArgument(checkpointInterval > 0, "checkpointInterval <= 0"); @@ -337,8 +318,6 @@ public class Log { this.checkpointDir = checkpointDir; this.backupCheckpointDir = backupCheckpointDir; this.logDirs = logDirs; - this.logWriteTimeout = logWriteTimeout; - this.checkpointWriteTimeout = checkpointWriteTimeout; logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length); workerExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name) @@ -356,9 +335,7 @@ public class Log { void replay() throws IOException { Preconditions.checkState(!open, "Cannot replay after Log has been opened"); - Preconditions.checkState(tryLockExclusive(), "Cannot obtain lock on " - + channelNameDescriptor); - + lockExclusive(); try { /* * First we are going to look through the data directories @@ -751,28 +728,12 @@ public class Log { } - private boolean tryLockExclusive() { - try { - return checkpointWriterLock.tryLock(checkpointWriteTimeout, - TimeUnit.SECONDS); - } catch (InterruptedException ex) { - LOGGER.warn("Interrupted while waiting for log exclusive lock", ex); - Thread.currentThread().interrupt(); - } - return false; - } private void unlockExclusive() { checkpointWriterLock.unlock(); } - boolean tryLockShared() { - try { - return checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - LOGGER.warn("Interrupted while waiting for log shared lock", ex); - Thread.currentThread().interrupt(); - } - return false; + void lockShared() { + checkpointReadLock.lock(); } void unlockShared() { @@ -929,29 +890,25 @@ public class Log { * @param index * @throws IOException */ - private synchronized void roll(int index, ByteBuffer buffer) - throws IOException { - if (!tryLockShared()) { - throw new ChannelException("Failed to obtain lock for writing to the " - + "log. Try increasing the log write timeout value. " + - channelNameDescriptor); - } + private synchronized void roll(int index, ByteBuffer buffer) + throws IOException { + lockShared(); try { LogFile.Writer oldLogFile = logFiles.get(index); // check to make sure a roll is actually required due to // the possibility of multiple writes waiting on lock - if(oldLogFile == null || buffer == null || - oldLogFile.isRollRequired(buffer)) { + if (oldLogFile == null || buffer == null || + oldLogFile.isRollRequired(buffer)) { try { LOGGER.info("Roll start " + logDirs[index]); int fileID = nextFileID.incrementAndGet(); File file = new File(logDirs[index], PREFIX + fileID); LogFile.Writer writer = LogFileFactory.getWriter(file, fileID, - maxFileSize, encryptionKey, encryptionKeyAlias, - encryptionCipherProvider, usableSpaceRefreshInterval); + maxFileSize, encryptionKey, encryptionKeyAlias, + encryptionCipherProvider, usableSpaceRefreshInterval); idLogFileMap.put(fileID, LogFileFactory.getRandomReader(file, - encryptionKeyProvider)); + encryptionKeyProvider)); // writer from this point on will get new reference logFiles.set(index, writer); // close out old log @@ -988,10 +945,7 @@ public class Log { throw new IOException("Usable space exhaused, only " + usableSpace + " bytes remaining, required " + minimumRequiredSpace + " bytes"); } - boolean lockAcquired = tryLockExclusive(); - if(!lockAcquired) { - return false; - } + lockExclusive(); SortedSet<Integer> logFileRefCountsAll = null, logFileRefCountsActive = null; try { if (queue.checkpoint(force)) { http://git-wip-us.apache.org/repos/asf/flume/blob/b4ddd582/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index d120a74..1ec5a22 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2180,8 +2180,6 @@ maxFileSize 2146435071 minimumRequiredSpace 524288000 Minimum Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put requests when free space drops below this value capacity 1000000 Maximum capacity of the channel keep-alive 3 Amount of time (in sec) to wait for a put operation -write-timeout 10 Amount of time (in sec) to wait for a write operation -checkpoint-timeout 600 Expert: Amount of time (in sec) to wait for a checkpoint use-log-replay-v1 false Expert: Use old replay logic use-fast-replay false Expert: Replay without using queue encryption.activeKey -- Key name used to encrypt new data
