Repository: flume Updated Branches: refs/heads/trunk a94594dd2 -> 6115e7d6d
FLUME-2181 - Optionally disable File Channel fsyncs (Hari via Brock) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6115e7d6 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6115e7d6 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6115e7d6 Branch: refs/heads/trunk Commit: 6115e7d6d611d2b82dc2583b95a13d4c0886a93f Parents: a94594d Author: Brock Noland <[email protected]> Authored: Fri May 2 07:32:33 2014 -0700 Committer: Brock Noland <[email protected]> Committed: Fri May 2 07:32:33 2014 -0700 ---------------------------------------------------------------------- .../flume/channel/file/CheckpointRebuilder.java | 11 ++- .../apache/flume/channel/file/FileChannel.java | 26 ++++++- .../channel/file/FileChannelConfiguration.java | 7 ++ .../java/org/apache/flume/channel/file/Log.java | 54 ++++++++++--- .../org/apache/flume/channel/file/LogFile.java | 81 +++++++++++++++++--- .../flume/channel/file/LogFileFactory.java | 16 ++-- .../apache/flume/channel/file/LogFileV2.java | 5 +- .../apache/flume/channel/file/LogFileV3.java | 61 +++++++++++---- .../flume/channel/file/ReplayHandler.java | 12 ++- .../channel/file/TransactionEventRecord.java | 4 + .../encryption/AESCTRNoPaddingProvider.java | 5 +- .../encryption/DecryptionFailureException.java | 38 +++++++++ .../channel/file/TestCheckpointRebuilder.java | 2 +- .../flume/channel/file/TestFileChannel.java | 35 +++++++-- .../org/apache/flume/channel/file/TestLog.java | 20 ++--- .../apache/flume/channel/file/TestLogFile.java | 18 ++--- .../apache/flume/channel/file/TestUtils.java | 4 +- .../flume/tools/FileChannelIntegrityTool.java | 2 +- .../tools/TestFileChannelIntegrityTool.java | 2 +- 19 files changed, 314 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java index 4388181..b961ae2 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java @@ -49,14 +49,17 @@ public class CheckpointRebuilder { HashMultimap.create(); private final SetMultimap<Long, ComparableFlumeEventPointer> uncommittedTakes = HashMultimap.create(); + private final boolean fsyncPerTransaction; private static Logger LOG = LoggerFactory.getLogger(CheckpointRebuilder.class); public CheckpointRebuilder(List<File> logFiles, - FlumeEventQueue queue) throws IOException { + FlumeEventQueue queue, boolean fsyncPerTransaction) throws + IOException { this.logFiles = logFiles; this.queue = queue; + this.fsyncPerTransaction = fsyncPerTransaction; } public boolean rebuild() throws IOException, Exception { @@ -64,7 +67,8 @@ public class CheckpointRebuilder { List<LogFile.SequentialReader> logReaders = Lists.newArrayList(); for (File logFile : logFiles) { try { - logReaders.add(LogFileFactory.getSequentialReader(logFile, null)); + logReaders.add(LogFileFactory.getSequentialReader(logFile, null, + fsyncPerTransaction)); } catch(EOFException e) { LOG.warn("Ignoring " + logFile + " due to EOF", e); } @@ -252,7 +256,8 @@ public class CheckpointRebuilder { new File(checkpointDir, "inflighttakes"), new File(checkpointDir, "inflightputs"), new File(checkpointDir, Log.QUEUE_SET)); - CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles, queue); + CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles, + queue, true); if(rebuilder.rebuild()) { rebuilder.writeCheckpoint(); } else { http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index 5203ca1..0f242d2 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -95,6 +95,8 @@ public class FileChannel extends BasicChannelSemantics { private String encryptionActiveKey; private String encryptionCipherProvider; private boolean useDualCheckpoints; + private boolean fsyncPerTransaction; + private int fsyncInterval; @Override public synchronized void setName(String name) { @@ -233,6 +235,12 @@ public class FileChannel extends BasicChannelSemantics { "key provider name is not."); } + fsyncPerTransaction = context.getBoolean(FileChannelConfiguration + .FSYNC_PER_TXN, FileChannelConfiguration.DEFAULT_FSYNC_PRE_TXN); + + fsyncInterval = context.getInteger(FileChannelConfiguration + .FSYNC_INTERVAL, FileChannelConfiguration.DEFAULT_FSYNC_INTERVAL); + if(queueRemaining == null) { queueRemaining = new Semaphore(capacity, true); } @@ -265,6 +273,8 @@ public class FileChannel extends BasicChannelSemantics { builder.setEncryptionCipherProvider(encryptionCipherProvider); builder.setUseDualCheckpoints(useDualCheckpoints); builder.setBackupCheckpointDir(backupCheckpointDir); + builder.setFsyncPerTransaction(fsyncPerTransaction); + builder.setFsyncInterval(fsyncInterval); log = builder.build(); log.replay(); open = true; @@ -328,8 +338,8 @@ public class FileChannel extends BasicChannelSemantics { trans.getStateAsString() + channelNameDescriptor); } trans = new FileBackedTransaction(log, TransactionIDOracle.next(), - transactionCapacity, keepAlive, queueRemaining, getName(), - channelCounter); + transactionCapacity, keepAlive, queueRemaining, getName(), + fsyncPerTransaction, channelCounter); transactions.set(trans); return trans; } @@ -401,9 +411,11 @@ public class FileChannel extends BasicChannelSemantics { private final Semaphore queueRemaining; private final String channelNameDescriptor; private final ChannelCounter channelCounter; + private final boolean fsyncPerTransaction; public FileBackedTransaction(Log log, long transactionID, int transCapacity, int keepAlive, Semaphore queueRemaining, - String name, ChannelCounter counter) { + String name, boolean fsyncPerTransaction, ChannelCounter + counter) { this.log = log; queue = log.getFlumeEventQueue(); this.transactionID = transactionID; @@ -411,6 +423,7 @@ public class FileChannel extends BasicChannelSemantics { this.queueRemaining = queueRemaining; putList = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity); takeList = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity); + this.fsyncPerTransaction = fsyncPerTransaction; channelNameDescriptor = "[channel=" + name + "]"; this.channelCounter = counter; } @@ -500,6 +513,13 @@ public class FileChannel extends BasicChannelSemantics { LOG.warn("Corrupt record replaced by File Channel Integrity " + "tool found. Will retrieve next event", e); takeList.remove(ptr); + } catch (CorruptEventException ex) { + if (fsyncPerTransaction) { + throw new ChannelException(ex); + } + LOG.warn("Corrupt record found. Event will be " + + "skipped, and next event will be read.", ex); + takeList.remove(ptr); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java index e4bc879..87dc653 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java @@ -87,4 +87,11 @@ public class FileChannelConfiguration { public static final String USE_DUAL_CHECKPOINTS = "useDualCheckpoints"; public static final boolean DEFAULT_USE_DUAL_CHECKPOINTS = false; + public static final String FSYNC_PER_TXN = "fsyncPerTransaction"; + public static final boolean DEFAULT_FSYNC_PRE_TXN = true; + + public static final String FSYNC_INTERVAL = "fsyncInterval"; + public static final int DEFAULT_FSYNC_INTERVAL = 5; // seconds. + + } http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 579ee35..5bac0f4 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -125,6 +125,9 @@ public class Log { private final boolean useDualCheckpoints; private volatile boolean backupRestored = false; + private final boolean fsyncPerTransaction; + private final int fsyncInterval; + private int readCount; private int putCount; private int takeCount; @@ -150,6 +153,25 @@ public class Log { private boolean bUseDualCheckpoints = false; private File bBackupCheckpointDir = null; + private boolean fsyncPerTransaction = true; + private int fsyncInterval; + + boolean isFsyncPerTransaction() { + return fsyncPerTransaction; + } + + void setFsyncPerTransaction(boolean fsyncPerTransaction) { + this.fsyncPerTransaction = fsyncPerTransaction; + } + + int getFsyncInterval() { + return fsyncInterval; + } + + void setFsyncInterval(int fsyncInterval) { + this.fsyncInterval = fsyncInterval; + } + Builder setUsableSpaceRefreshInterval(long usableSpaceRefreshInterval) { bUsableSpaceRefreshInterval = usableSpaceRefreshInterval; return this; @@ -231,7 +253,7 @@ public class Log { useLogReplayV1, useFastReplay, bMinimumRequiredSpace, bEncryptionKeyProvider, bEncryptionKeyAlias, bEncryptionCipherProvider, bUsableSpaceRefreshInterval, - bLogDirs); + fsyncPerTransaction, fsyncInterval, bLogDirs); } } @@ -241,7 +263,8 @@ public class Log { long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider, @Nullable String encryptionKeyAlias, @Nullable String encryptionCipherProvider, - long usableSpaceRefreshInterval, File... logDirs) + long usableSpaceRefreshInterval, boolean fsyncPerTransaction, + int fsyncInterval, File... logDirs) throws IOException { Preconditions.checkArgument(checkpointInterval > 0, "checkpointInterval <= 0"); @@ -318,6 +341,8 @@ public class Log { this.checkpointDir = checkpointDir; this.backupCheckpointDir = backupCheckpointDir; this.logDirs = logDirs; + this.fsyncPerTransaction = fsyncPerTransaction; + this.fsyncInterval = fsyncInterval; logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length); workerExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name) @@ -354,7 +379,7 @@ public class Log { dataFiles.add(file); nextFileID.set(Math.max(nextFileID.get(), id)); idLogFileMap.put(id, LogFileFactory.getRandomReader(new File(logDir, - PREFIX + id), encryptionKeyProvider)); + PREFIX + id), encryptionKeyProvider, fsyncPerTransaction)); } } LOGGER.info("Found NextFileID " + nextFileID + @@ -468,13 +493,13 @@ public class Log { KeyProvider encryptionKeyProvider, boolean useFastReplay) throws Exception { CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles, - queue); + queue, fsyncPerTransaction); if (useFastReplay && rebuilder.rebuild()) { didFastReplay = true; LOGGER.info("Fast replay successful."); } else { ReplayHandler replayHandler = new ReplayHandler(queue, - encryptionKeyProvider); + encryptionKeyProvider, fsyncPerTransaction); if (useLogReplayV1) { LOGGER.info("Replaying logs with v1 replay logic"); replayHandler.replayLogv1(dataFiles); @@ -551,7 +576,7 @@ public class Log { * @throws InterruptedException */ FlumeEvent get(FlumeEventPointer pointer) throws IOException, - InterruptedException, NoopRecordException { + InterruptedException, NoopRecordException, CorruptEventException { Preconditions.checkState(open, "Log is closed"); int id = pointer.getFileID(); LogFile.RandomReader logFile = idLogFileMap.get(id); @@ -559,9 +584,12 @@ public class Log { try { return logFile.get(pointer.getOffset()); } catch (CorruptEventException ex) { - open = false; - throw new IOException("Corrupt event found. Please run File Channel " + - "Integrity tool.", ex); + if (fsyncPerTransaction) { + open = false; + throw new IOException("Corrupt event found. Please run File Channel " + + "Integrity tool.", ex); + } + throw ex; } } @@ -906,9 +934,10 @@ public class Log { File file = new File(logDirs[index], PREFIX + fileID); LogFile.Writer writer = LogFileFactory.getWriter(file, fileID, maxFileSize, encryptionKey, encryptionKeyAlias, - encryptionCipherProvider, usableSpaceRefreshInterval); + encryptionCipherProvider, usableSpaceRefreshInterval, + fsyncPerTransaction, fsyncInterval); idLogFileMap.put(fileID, LogFileFactory.getRandomReader(file, - encryptionKeyProvider)); + encryptionKeyProvider, fsyncPerTransaction)); // writer from this point on will get new reference logFiles.set(index, writer); // close out old log @@ -991,7 +1020,8 @@ public class Log { } finally { writer.close(); } - reader = LogFileFactory.getRandomReader(file, encryptionKeyProvider); + reader = LogFileFactory.getRandomReader(file, + encryptionKeyProvider, fsyncPerTransaction); idLogFileMap.put(id, reader); LOGGER.debug("Updated checkpoint for file: " + file + "logWriteOrderID " + logWriteOrderID); http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java index 26a24b1..488dcf4 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java @@ -41,6 +41,9 @@ import java.nio.channels.FileChannel; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @InterfaceAudience.Private @@ -169,13 +172,18 @@ public abstract class LogFile { private long lastCommitPosition; private long lastSyncPosition; + private final boolean fsyncPerTransaction; + private final int fsyncInterval; + private final ScheduledExecutorService syncExecutor; + private volatile boolean dirty = false; + // To ensure we can count the number of fsyncs. private long syncCount; Writer(File file, int logFileID, long maxFileSize, - CipherProvider.Encryptor encryptor, long usableSpaceRefreshInterval) - throws IOException { + CipherProvider.Encryptor encryptor, long usableSpaceRefreshInterval, + boolean fsyncPerTransaction, int fsyncInterval) throws IOException { this.file = file; this.logFileID = logFileID; this.maxFileSize = Math.min(maxFileSize, @@ -183,6 +191,25 @@ public abstract class LogFile { this.encryptor = encryptor; writeFileHandle = new RandomAccessFile(file, "rw"); writeFileChannel = writeFileHandle.getChannel(); + this.fsyncPerTransaction = fsyncPerTransaction; + this.fsyncInterval = fsyncInterval; + if(!fsyncPerTransaction) { + LOG.info("Sync interval = " + fsyncInterval); + syncExecutor = Executors.newSingleThreadScheduledExecutor(); + syncExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + sync(); + } catch (Throwable ex) { + LOG.error("Data file, " + getFile().toString() + " could not " + + "be synced to disk due to an error.", ex); + } + } + }, fsyncInterval, fsyncInterval, TimeUnit.SECONDS); + } else { + syncExecutor = null; + } usableSpace = new CachedFSUsableSpace(file, usableSpaceRefreshInterval); LOG.info("Opened " + file); open = true; @@ -258,6 +285,7 @@ public abstract class LogFile { buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array())); } write(buffer); + dirty = true; lastCommitPosition = position(); } @@ -299,6 +327,14 @@ public abstract class LogFile { * @throws LogFileRetryableIOException - if this log file is closed. */ synchronized void sync() throws IOException { + if (!fsyncPerTransaction && !dirty) { + if(LOG.isDebugEnabled()) { + LOG.debug( + "No events written to file, " + getFile().toString() + + " in last " + fsyncInterval + " or since last commit."); + } + return; + } if (!isOpen()) { throw new LogFileRetryableIOException("File closed " + file); } @@ -306,6 +342,7 @@ public abstract class LogFile { getFileChannel().force(false); lastSyncPosition = position(); syncCount++; + dirty = false; } } @@ -322,6 +359,13 @@ public abstract class LogFile { synchronized void close() { if(open) { open = false; + if (!fsyncPerTransaction) { + // Shutdown the executor before attempting to close. + if(syncExecutor != null) { + // No need to wait for it to shutdown. + syncExecutor.shutdown(); + } + } if(writeFileChannel.isOpen()) { LOG.info("Closing " + file); try { @@ -396,12 +440,15 @@ public abstract class LogFile { private final BlockingQueue<RandomAccessFile> readFileHandles = new ArrayBlockingQueue<RandomAccessFile>(50, true); private final KeyProvider encryptionKeyProvider; + private final boolean fsyncPerTransaction; private volatile boolean open; - public RandomReader(File file, @Nullable KeyProvider encryptionKeyProvider) + public RandomReader(File file, @Nullable KeyProvider + encryptionKeyProvider, boolean fsyncPerTransaction) throws IOException { this.file = file; this.encryptionKeyProvider = encryptionKeyProvider; readFileHandles.add(open()); + this.fsyncPerTransaction = fsyncPerTransaction; open = true; } @@ -430,8 +477,11 @@ public abstract class LogFile { throw new NoopRecordException("No op record found. Corrupt record " + "may have been repaired by File Channel Integrity tool"); } - Preconditions.checkState(operation == OP_RECORD, - Integer.toHexString(operation)); + if (operation != OP_RECORD) { + throw new CorruptEventException( + "Operation code is invalid. File " + + "is corrupt. Please run File Channel Integrity tool."); + } TransactionEventRecord record = doGet(fileHandle); if(!(record instanceof Put)) { Preconditions.checkState(false, "Record is " + @@ -491,8 +541,8 @@ public abstract class LogFile { } int remaining = readFileHandles.remainingCapacity(); if(remaining > 0) { - LOG.info("Opening " + file + " for read, remaining capacity is " - + remaining); + LOG.info("Opening " + file + " for read, remaining number of file " + + "handles available for reads of this file is " + remaining); return open(); } return readFileHandles.take(); @@ -647,11 +697,20 @@ public abstract class LogFile { output.put(buffer); } protected static byte[] readDelimitedBuffer(RandomAccessFile fileHandle) - throws IOException { + throws IOException, CorruptEventException { int length = fileHandle.readInt(); - Preconditions.checkState(length >= 0, Integer.toHexString(length)); + if (length < 0) { + throw new CorruptEventException("Length of event is: " + String.valueOf + (length) + ". Event must have length >= 0. Possible corruption of " + + "data or partial fsync."); + } byte[] buffer = new byte[length]; - fileHandle.readFully(buffer); + try { + fileHandle.readFully(buffer); + } catch (EOFException ex) { + throw new CorruptEventException("Remaining data in file less than " + + "expected size of event.", ex); + } return buffer; } @@ -659,7 +718,7 @@ public abstract class LogFile { File file = new File(args[0]); LogFile.SequentialReader reader = null; try { - reader = LogFileFactory.getSequentialReader(file, null); + reader = LogFileFactory.getSequentialReader(file, null, false); LogRecord entry; FlumeEventPointer ptr; // for puts the fileId is the fileID of the file they exist in http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java index 9c98d8c..7d7fd85 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java @@ -68,17 +68,19 @@ class LogFileFactory { long maxFileSize, @Nullable Key encryptionKey, @Nullable String encryptionKeyAlias, @Nullable String encryptionCipherProvider, - long usableSpaceRefreshInterval) throws IOException { + long usableSpaceRefreshInterval, boolean fsyncPerTransaction, + int fsyncInterval) throws IOException { Preconditions.checkState(!file.exists(), "File already exists " + file.getAbsolutePath()); Preconditions.checkState(file.createNewFile(), "File could not be created " + file.getAbsolutePath()); return new LogFileV3.Writer(file, logFileID, maxFileSize, encryptionKey, - encryptionKeyAlias, encryptionCipherProvider, usableSpaceRefreshInterval); + encryptionKeyAlias, encryptionCipherProvider, + usableSpaceRefreshInterval, fsyncPerTransaction, fsyncInterval); } static LogFile.RandomReader getRandomReader(File file, - @Nullable KeyProvider encryptionKeyProvider) + @Nullable KeyProvider encryptionKeyProvider, boolean fsyncPerTransaction) throws IOException { RandomAccessFile logFile = new RandomAccessFile(file, "r"); try { @@ -86,7 +88,8 @@ class LogFileFactory { // either this is a rr for a just created file or // the metadata file exists and as such it's V3 if(logFile.length() == 0L || metaDataFile.exists()) { - return new LogFileV3.RandomReader(file, encryptionKeyProvider); + return new LogFileV3.RandomReader(file, encryptionKeyProvider, + fsyncPerTransaction); } int version = logFile.readInt(); if(Serialization.VERSION_2 == version) { @@ -106,7 +109,7 @@ class LogFileFactory { } static LogFile.SequentialReader getSequentialReader(File file, - @Nullable KeyProvider encryptionKeyProvider) + @Nullable KeyProvider encryptionKeyProvider, boolean fsyncPerTransaction) throws IOException { RandomAccessFile logFile = null; try { @@ -159,7 +162,8 @@ class LogFileFactory { throw new EOFException(String.format("MetaData file %s is empty", metaDataFile)); } - return new LogFileV3.SequentialReader(file, encryptionKeyProvider); + return new LogFileV3.SequentialReader(file, encryptionKeyProvider, + fsyncPerTransaction); } logFile = new RandomAccessFile(file, "r"); int version = logFile.readInt(); http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java index f286c57..bb25e95 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java @@ -96,7 +96,8 @@ class LogFileV2 extends LogFile { Writer(File file, int logFileID, long maxFileSize, long usableSpaceRefreshInterval) throws IOException { - super(file, logFileID, maxFileSize, null, usableSpaceRefreshInterval); + super(file, logFileID, maxFileSize, null, usableSpaceRefreshInterval, + true, 0); RandomAccessFile writeFileHandle = getFileHandle(); writeFileHandle.writeInt(getVersion()); writeFileHandle.writeInt(logFileID); @@ -116,7 +117,7 @@ class LogFileV2 extends LogFile { static class RandomReader extends LogFile.RandomReader { RandomReader(File file) throws IOException { - super(file, null); + super(file, null, true); } @Override int getVersion() { http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java index 38f6ecb..9b0ef93 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java @@ -21,10 +21,12 @@ package org.apache.flume.channel.file; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import com.google.protobuf.GeneratedMessage; +import org.apache.flume.Transaction; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.channel.file.encryption.CipherProvider; import org.apache.flume.channel.file.encryption.CipherProviderFactory; +import org.apache.flume.channel.file.encryption.DecryptionFailureException; import org.apache.flume.channel.file.encryption.KeyProvider; import org.apache.flume.channel.file.proto.ProtosFactory; import org.slf4j.Logger; @@ -178,11 +180,11 @@ public class LogFileV3 extends LogFile { @Nullable Key encryptionKey, @Nullable String encryptionKeyAlias, @Nullable String encryptionCipherProvider, - long usableSpaceRefreshInterval) - throws IOException { + long usableSpaceRefreshInterval, boolean fsyncPerTransaction, + int fsyncInterval) throws IOException { super(file, logFileID, maxFileSize, CipherProviderFactory. getEncrypter(encryptionCipherProvider, encryptionKey), - usableSpaceRefreshInterval); + usableSpaceRefreshInterval, fsyncPerTransaction, fsyncInterval); ProtosFactory.LogFileMetaData.Builder metaDataBuilder = ProtosFactory.LogFileMetaData.newBuilder(); if(encryptionKey != null) { @@ -219,10 +221,11 @@ public class LogFileV3 extends LogFile { private volatile String cipherProvider; private volatile byte[] parameters; private BlockingQueue<CipherProvider.Decryptor> decryptors = - new LinkedBlockingDeque<CipherProvider.Decryptor>(); - RandomReader(File file, @Nullable KeyProvider encryptionKeyProvider) - throws IOException { - super(file, encryptionKeyProvider); + new LinkedBlockingDeque<CipherProvider.Decryptor>(); + + RandomReader(File file, @Nullable KeyProvider encryptionKeyProvider, + boolean fsyncPerTransaction) throws IOException { + super(file, encryptionKeyProvider, fsyncPerTransaction); } private void initialize() throws IOException { File metaDataFile = Serialization.getMetaDataFile(getFile()); @@ -281,10 +284,10 @@ public class LogFileV3 extends LogFile { initialize(); } } - byte[] buffer = readDelimitedBuffer(fileHandle); - CipherProvider.Decryptor decryptor = null; boolean success = false; + CipherProvider.Decryptor decryptor = null; try { + byte[] buffer = readDelimitedBuffer(fileHandle); if(encryptionEnabled) { decryptor = getDecryptor(); buffer = decryptor.decrypt(buffer); @@ -293,6 +296,8 @@ public class LogFileV3 extends LogFile { fromByteArray(buffer); success = true; return event; + } catch(DecryptionFailureException ex) { + throw new CorruptEventException("Error decrypting event", ex); } finally { if(success && encryptionEnabled && decryptor != null) { decryptors.offer(decryptor); @@ -303,10 +308,12 @@ public class LogFileV3 extends LogFile { public static class SequentialReader extends LogFile.SequentialReader { private CipherProvider.Decryptor decryptor; - + private final boolean fsyncPerTransaction; public SequentialReader(File file, @Nullable KeyProvider - encryptionKeyProvider) throws EOFException, IOException { + encryptionKeyProvider, boolean fsyncPerTransaction) throws EOFException, + IOException { super(file, encryptionKeyProvider); + this.fsyncPerTransaction = fsyncPerTransaction; File metaDataFile = Serialization.getMetaDataFile(file); FileInputStream inputStream = new FileInputStream(metaDataFile); try { @@ -351,13 +358,33 @@ public class LogFileV3 extends LogFile { } @Override - LogRecord doNext(int offset) throws IOException, CorruptEventException { - byte[] buffer = readDelimitedBuffer(getFileHandle()); - if(decryptor != null) { - buffer = decryptor.decrypt(buffer); + LogRecord doNext(int offset) throws IOException, CorruptEventException, + DecryptionFailureException { + byte[] buffer = null; + TransactionEventRecord event = null; + try { + buffer = readDelimitedBuffer(getFileHandle()); + if (decryptor != null) { + buffer = decryptor.decrypt(buffer); + } + event = TransactionEventRecord.fromByteArray(buffer); + } catch (CorruptEventException ex) { + LOGGER.warn("Corrupt file found. File id: log-" + this.getLogFileID(), + ex); + // Return null so that replay handler thinks all events in this file + // have been taken. + if (!fsyncPerTransaction) { + return null; + } + throw ex; + } catch (DecryptionFailureException ex) { + if (!fsyncPerTransaction) { + LOGGER.warn("Could not decrypt even read from channel. Skipping " + + "event.", ex); + return null; + } + throw ex; } - TransactionEventRecord event = - TransactionEventRecord.fromByteArray(buffer); return new LogRecord(getLogFileID(), offset, event); } } http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java index e668c2e..a559503 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java @@ -52,6 +52,7 @@ class ReplayHandler { private final Map<Integer, LogFile.SequentialReader> readers; private final PriorityQueue<LogRecord> logRecordBuffer; private final KeyProvider encryptionKeyProvider; + private final boolean fsyncPerTransaction; /** * This data structure stores takes for which we found a commit in the log * files before we found a commit for the put. This can happen if the channel @@ -91,19 +92,22 @@ class ReplayHandler { public int getCommitCount() { return commitCount; } + @VisibleForTesting public int getRollbackCount() { return rollbackCount; } ReplayHandler(FlumeEventQueue queue, - @Nullable KeyProvider encryptionKeyProvider) { + @Nullable KeyProvider encryptionKeyProvider, + boolean fsyncPerTransaction) { this.queue = queue; this.lastCheckpoint = queue.getLogWriteOrderID(); pendingTakes = Lists.newArrayList(); readers = Maps.newHashMap(); logRecordBuffer = new PriorityQueue<LogRecord>(); this.encryptionKeyProvider = encryptionKeyProvider; + this.fsyncPerTransaction = fsyncPerTransaction; } /** * Replay logic from Flume1.2 which can be activated if the v2 logic @@ -129,7 +133,8 @@ class ReplayHandler { LOG.info("Replaying " + log); LogFile.SequentialReader reader = null; try { - reader = LogFileFactory.getSequentialReader(log, encryptionKeyProvider); + reader = LogFileFactory.getSequentialReader(log, + encryptionKeyProvider, fsyncPerTransaction); reader.skipToLastCheckpointPosition(queue.getLogWriteOrderID()); LogRecord entry; FlumeEventPointer ptr; @@ -257,7 +262,8 @@ class ReplayHandler { LOG.info("Replaying " + log); try { LogFile.SequentialReader reader = - LogFileFactory.getSequentialReader(log, encryptionKeyProvider); + LogFileFactory.getSequentialReader(log, encryptionKeyProvider, + fsyncPerTransaction); reader.skipToLastCheckpointPosition(queue.getLogWriteOrderID()); Preconditions.checkState(!readers.containsKey(reader.getLogFileID()), "Readers " + readers + " already contains " http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java index ea7f00c..1eb3f4f 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java @@ -29,6 +29,7 @@ import java.io.OutputStream; import java.lang.reflect.Constructor; import java.nio.ByteBuffer; +import com.google.protobuf.InvalidProtocolBufferException; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.channel.file.proto.ProtosFactory; @@ -207,6 +208,9 @@ public abstract class TransactionEventRecord implements Writable { ProtosFactory.TransactionEventFooter. parseDelimitedFrom(in), "Footer cannot be null"); return transactionEvent; + } catch (InvalidProtocolBufferException ex) { + throw new CorruptEventException( + "Could not parse event from data file.", ex); } finally { try { in.close(); http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java index d0a84fe..9ee4245 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java @@ -103,14 +103,15 @@ public class AESCTRNoPaddingProvider extends CipherProvider { } } - private static byte[] doFinal(Cipher cipher, byte[] input) { + private static byte[] doFinal(Cipher cipher, byte[] input) + throws DecryptionFailureException{ try { return cipher.doFinal(input); } catch (Exception e) { String msg = "Unable to encrypt or decrypt data " + TYPE + " input.length " + input.length; LOG.error(msg, e); - throw Throwables.propagate(e); + throw new DecryptionFailureException(msg, e); } } http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java new file mode 100644 index 0000000..0155c39 --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file.encryption; + +import org.apache.flume.FlumeException; + +/** + * Exception that is thrown when the channel is unable to decrypt an even + * read from the channel. + */ +public class DecryptionFailureException extends FlumeException { + private static final long serialVersionUID = 6646810195384793646L; + + + public DecryptionFailureException(String msg) { + super(msg); + } + + public DecryptionFailureException(String msg, Throwable th) { + super(msg, th); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java index 621d445..c6c6ad3 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java @@ -74,7 +74,7 @@ public class TestCheckpointRebuilder extends TestFileChannelBase { FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile, queueSetDir); CheckpointRebuilder checkpointRebuilder = - new CheckpointRebuilder(getAllLogs(dataDirs), queue); + new CheckpointRebuilder(getAllLogs(dataDirs), queue, true); Assert.assertTrue(checkpointRebuilder.rebuild()); channel = createFileChannel(overrides); channel.start(); http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index 25765b5..bb22e26 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -573,8 +574,25 @@ public class TestFileChannel extends TestFileChannelBase { } @Test (expected = IllegalStateException.class) - public void testChannelDiesOnCorruptEvent() throws Exception { - final FileChannel channel = createFileChannel(); + public void testChannelDiesOnCorruptEventFsync() throws Exception { + testChannelDiesOnCorruptEvent(true); + } + + + @Test + public void testChannelDiesOnCorruptEventNoFsync() throws + Exception { + testChannelDiesOnCorruptEvent(false); + } + + + + private void testChannelDiesOnCorruptEvent(boolean fsyncPerTxn) + throws Exception { + Map<String, String> overrides = new HashMap<String, String>(); + overrides.put(FileChannelConfiguration.FSYNC_PER_TXN, + String.valueOf(fsyncPerTxn)); + final FileChannel channel = createFileChannel(overrides); channel.start(); putEvents(channel,"test-corrupt-event",100,100); for(File dataDir : dataDirs) { @@ -596,8 +614,9 @@ public class TestFileChannel extends TestFileChannelBase { } } } + Set<String> events; try { - consumeChannel(channel, true); + events = consumeChannel(channel, true); } catch (IllegalStateException ex) { // The rollback call in takeEvents() in TestUtils will cause an // IllegalArgumentException - and this should be tested to verify the @@ -605,9 +624,13 @@ public class TestFileChannel extends TestFileChannelBase { Assert.assertTrue(ex.getMessage().contains("Log is closed")); throw ex; } - Assert.fail(); - - + if(fsyncPerTxn) { + Assert.fail(); + } else { + // The corrupt event must be missing, the rest should be + // returned + Assert.assertEquals(99, events.size()); + } } } http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java index d1f51fc..c9a64ed 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java @@ -76,7 +76,7 @@ public class TestLog { */ @Test public void testPutGet() - throws IOException, InterruptedException, NoopRecordException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long transactionID = ++this.transactionID; FlumeEventPointer eventPointer = log.put(transactionID, eventIn); @@ -89,7 +89,7 @@ public class TestLog { } @Test public void testRoll() - throws IOException, InterruptedException, NoopRecordException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { log.shutdownWorker(); Thread.sleep(1000); for (int i = 0; i < 1000; i++) { @@ -119,7 +119,7 @@ public class TestLog { */ @Test public void testPutCommit() - throws IOException, InterruptedException, NoopRecordException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long transactionID = ++this.transactionID; FlumeEventPointer eventPointerIn = log.put(transactionID, eventIn); @@ -247,16 +247,16 @@ public class TestLog { */ @Test public void testPutTakeRollbackLogReplayV1() - throws IOException, InterruptedException, NoopRecordException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { doPutTakeRollback(true); } @Test public void testPutTakeRollbackLogReplayV2() - throws IOException, InterruptedException, NoopRecordException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { doPutTakeRollback(false); } public void doPutTakeRollback(boolean useLogReplayV1) - throws IOException, InterruptedException, NoopRecordException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long putTransactionID = ++transactionID; FlumeEventPointer eventPointerIn = log.put(putTransactionID, eventIn); @@ -396,7 +396,7 @@ public class TestLog { } @Test public void testReplaySucceedsWithUnusedEmptyLogMetaDataNormalReplay() - throws IOException, InterruptedException, NoopRecordException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long transactionID = ++this.transactionID; FlumeEventPointer eventPointer = log.put(transactionID, eventIn); @@ -410,7 +410,7 @@ public class TestLog { } @Test public void testReplaySucceedsWithUnusedEmptyLogMetaDataFastReplay() - throws IOException, InterruptedException, NoopRecordException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long transactionID = ++this.transactionID; FlumeEventPointer eventPointer = log.put(transactionID, eventIn); @@ -427,7 +427,7 @@ public class TestLog { } public void doTestReplaySucceedsWithUnusedEmptyLogMetaData(FlumeEvent eventIn, FlumeEventPointer eventPointer) throws IOException, - InterruptedException, NoopRecordException { + InterruptedException, NoopRecordException, CorruptEventException { for (int i = 0; i < dataDirs.length; i++) { for(File logFile : LogUtils.getLogs(dataDirs[i])) { if(logFile.length() == 0L) { @@ -467,7 +467,7 @@ public class TestLog { private void takeAndVerify(FlumeEventPointer eventPointerIn, FlumeEvent eventIn) - throws IOException, InterruptedException, NoopRecordException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { FlumeEventQueue queue = log.getFlumeEventQueue(); FlumeEventPointer eventPointerOut = queue.removeHead(0); Assert.assertNotNull(eventPointerOut); http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java index e5d830e..976a112 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java @@ -63,7 +63,7 @@ public class TestLogFile { dataFile = new File(dataDir, String.valueOf(fileID)); Assert.assertTrue(dataDir.isDirectory()); logFileWriter = LogFileFactory.getWriter(dataFile, fileID, - Integer.MAX_VALUE, null, null, null, Long.MAX_VALUE); + Integer.MAX_VALUE, null, null, null, Long.MAX_VALUE, true, 0); } @After public void cleanup() throws IOException { @@ -80,7 +80,7 @@ public class TestLogFile { Assert.assertTrue(dataFile.isFile() || dataFile.createNewFile()); try { LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null, - null, Long.MAX_VALUE); + null, Long.MAX_VALUE, true, 0); Assert.fail(); } catch (IllegalStateException e) { Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(), @@ -94,7 +94,7 @@ public class TestLogFile { Assert.assertTrue(dataFile.mkdirs()); try { LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null, - null, Long.MAX_VALUE); + null, Long.MAX_VALUE, true, 0); Assert.fail(); } catch (IllegalStateException e) { Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(), @@ -109,7 +109,7 @@ public class TestLogFile { CompletionService<Void> completionService = new ExecutorCompletionService <Void>(executorService); final LogFile.RandomReader logFileReader = - LogFileFactory.getRandomReader(dataFile, null); + LogFileFactory.getRandomReader(dataFile, null, true); for (int i = 0; i < 1000; i++) { // first try and throw failures synchronized (errors) { @@ -168,7 +168,7 @@ public class TestLogFile { puts.put(ptr.getOffset(), put); } LogFile.SequentialReader reader = - LogFileFactory.getSequentialReader(dataFile, null); + LogFileFactory.getSequentialReader(dataFile, null, true); LogRecord entry; while((entry = reader.next()) != null) { Integer offset = entry.getOffset(); @@ -202,7 +202,7 @@ public class TestLogFile { Assert.fail("Renaming to meta.old failed"); } LogFile.SequentialReader reader = - LogFileFactory.getSequentialReader(dataFile, null); + LogFileFactory.getSequentialReader(dataFile, null, true); Assert.assertTrue(metadataFile.exists()); Assert.assertFalse(oldMetadataFile.exists()); LogRecord entry; @@ -240,7 +240,7 @@ public class TestLogFile { Assert.fail("Renaming to meta.temp failed"); } LogFile.SequentialReader reader = - LogFileFactory.getSequentialReader(dataFile, null); + LogFileFactory.getSequentialReader(dataFile, null, true); Assert.assertTrue(metadataFile.exists()); Assert.assertFalse(tempMetadataFile.exists()); Assert.assertFalse(oldMetadataFile.exists()); @@ -281,7 +281,7 @@ public class TestLogFile { @Test (expected = CorruptEventException.class) public void testPutGetCorruptEvent() throws Exception { final LogFile.RandomReader logFileReader = - LogFileFactory.getRandomReader(dataFile, null); + LogFileFactory.getRandomReader(dataFile, null, true); final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500); final Put put = new Put(++transactionID, WriteOrderOracle.next(), eventIn); @@ -306,7 +306,7 @@ public class TestLogFile { @Test (expected = NoopRecordException.class) public void testPutGetNoopEvent() throws Exception { final LogFile.RandomReader logFileReader = - LogFileFactory.getRandomReader(dataFile, null); + LogFileFactory.getRandomReader(dataFile, null, true); final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500); final Put put = new Put(++transactionID, WriteOrderOracle.next(), eventIn); http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java index 0fb9bc2..61f38d2 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java @@ -190,9 +190,9 @@ public class TestUtils { result.add(new String(event.getBody(), Charsets.UTF_8)); } transaction.commit(); - } catch (Exception ex) { + } catch (Throwable ex) { transaction.rollback(); - throw ex; + throw new RuntimeException(ex); } finally { transaction.close(); } http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java index aa24fa5..d0753a6 100644 --- a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java +++ b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java @@ -70,7 +70,7 @@ public class FileChannelIntegrityTool implements FlumeTool { for (File dataFile : dataFiles) { LOG.info("Checking for corruption in " + dataFile.toString()); LogFile.SequentialReader reader = - new LogFileV3.SequentialReader(dataFile, null); + new LogFileV3.SequentialReader(dataFile, null, true); LogFile.OperationRecordUpdater updater = new LogFile .OperationRecordUpdater(dataFile); boolean fileDone = false; http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java index d328671..f24ae56 100644 --- a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java +++ b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java @@ -120,7 +120,7 @@ public class TestFileChannelIntegrityTool { int corrupted = 0; for (File dataFile : files) { LogFile.SequentialReader reader = - new LogFileV3.SequentialReader(dataFile, null); + new LogFileV3.SequentialReader(dataFile, null, true); RandomAccessFile handle = new RandomAccessFile(dataFile, "rw"); long eventPosition1 = reader.getPosition(); LogRecord rec = reader.next();
