http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java index 488dcf4..336aa2c 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java @@ -50,20 +50,17 @@ import java.util.concurrent.atomic.AtomicLong; @InterfaceStability.Unstable public abstract class LogFile { - private static final Logger LOG = LoggerFactory - .getLogger(LogFile.class); - + private static final Logger LOG = LoggerFactory.getLogger(LogFile.class); /** * This class preallocates the data files 1MB at time to avoid * the updating of the inode on each write and to avoid the disk * filling up during a write. It's also faster, so there. */ - private static final ByteBuffer FILL = DirectMemoryUtils. - allocate(1024 * 1024); // preallocation, 1MB + private static final ByteBuffer FILL = DirectMemoryUtils.allocate(1024 * 1024); public static final byte OP_RECORD = Byte.MAX_VALUE; - public static final byte OP_NOOP = (Byte.MAX_VALUE + Byte.MIN_VALUE)/2; + public static final byte OP_NOOP = (Byte.MAX_VALUE + Byte.MIN_VALUE) / 2; public static final byte OP_EOF = Byte.MIN_VALUE; static { @@ -73,7 +70,7 @@ public abstract class LogFile { } protected static void skipRecord(RandomAccessFile fileHandle, - int offset) throws IOException { + int offset) throws IOException { fileHandle.seek(offset); int length = fileHandle.readInt(); fileHandle.skipBytes(length); @@ -93,31 +90,40 @@ public abstract class LogFile { writeFileHandle = new RandomAccessFile(file, "rw"); } + protected RandomAccessFile getFileHandle() { return writeFileHandle; } + protected void setLastCheckpointOffset(long lastCheckpointOffset) { this.lastCheckpointOffset = lastCheckpointOffset; } + protected void setLastCheckpointWriteOrderID(long lastCheckpointWriteOrderID) { this.lastCheckpointWriteOrderID = lastCheckpointWriteOrderID; } + protected long getLastCheckpointOffset() { return lastCheckpointOffset; } + protected long getLastCheckpointWriteOrderID() { return lastCheckpointWriteOrderID; } + protected File getFile() { return file; } + protected int getLogFileID() { return logFileID; } + void markCheckpoint(long logWriteOrderID) throws IOException { markCheckpoint(lastCheckpointOffset, logWriteOrderID); } + abstract void markCheckpoint(long currentPosition, long logWriteOrderID) throws IOException; @@ -150,9 +156,10 @@ public abstract class LogFile { Preconditions.checkArgument(numBytes >= 0, "numBytes less than zero"); value.addAndGet(-numBytes); } + long getUsableSpace() { long now = System.currentTimeMillis(); - if(now - interval > lastRefresh.get()) { + if (now - interval > lastRefresh.get()) { value.set(fs.getUsableSpace()); lastRefresh.set(now); } @@ -160,7 +167,7 @@ public abstract class LogFile { } } - static abstract class Writer { + abstract static class Writer { private final int logFileID; private final File file; private final long maxFileSize; @@ -180,10 +187,9 @@ public abstract class LogFile { // To ensure we can count the number of fsyncs. private long syncCount; - Writer(File file, int logFileID, long maxFileSize, - CipherProvider.Encryptor encryptor, long usableSpaceRefreshInterval, - boolean fsyncPerTransaction, int fsyncInterval) throws IOException { + CipherProvider.Encryptor encryptor, long usableSpaceRefreshInterval, + boolean fsyncPerTransaction, int fsyncInterval) throws IOException { this.file = file; this.logFileID = logFileID; this.maxFileSize = Math.min(maxFileSize, @@ -193,7 +199,7 @@ public abstract class LogFile { writeFileChannel = writeFileHandle.getChannel(); this.fsyncPerTransaction = fsyncPerTransaction; this.fsyncInterval = fsyncInterval; - if(!fsyncPerTransaction) { + if (!fsyncPerTransaction) { LOG.info("Sync interval = " + fsyncInterval); syncExecutor = Executors.newSingleThreadScheduledExecutor(); syncExecutor.scheduleWithFixedDelay(new Runnable() { @@ -203,7 +209,7 @@ public abstract class LogFile { sync(); } catch (Throwable ex) { LOG.error("Data file, " + getFile().toString() + " could not " + - "be synced to disk due to an error.", ex); + "be synced to disk due to an error.", ex); } } }, fsyncInterval, fsyncInterval, TimeUnit.SECONDS); @@ -220,6 +226,7 @@ public abstract class LogFile { protected CipherProvider.Encryptor getEncryptor() { return encryptor; } + int getLogFileID() { return logFileID; } @@ -227,6 +234,7 @@ public abstract class LogFile { File getFile() { return file; } + String getParent() { return file.getParent(); } @@ -240,7 +248,7 @@ public abstract class LogFile { } @VisibleForTesting - long getLastCommitPosition(){ + long getLastCommitPosition() { return lastCommitPosition; } @@ -253,6 +261,7 @@ public abstract class LogFile { long getSyncCount() { return syncCount; } + synchronized long position() throws IOException { return getFileChannel().position(); } @@ -261,20 +270,22 @@ public abstract class LogFile { // methods, so all methods need to be synchronized. synchronized FlumeEventPointer put(ByteBuffer buffer) throws IOException { - if(encryptor != null) { + if (encryptor != null) { buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array())); } Pair<Integer, Integer> pair = write(buffer); return new FlumeEventPointer(pair.getLeft(), pair.getRight()); } + synchronized void take(ByteBuffer buffer) throws IOException { - if(encryptor != null) { + if (encryptor != null) { buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array())); } write(buffer); } + synchronized void rollback(ByteBuffer buffer) throws IOException { - if(encryptor != null) { + if (encryptor != null) { buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array())); } write(buffer); @@ -290,20 +301,20 @@ public abstract class LogFile { } private Pair<Integer, Integer> write(ByteBuffer buffer) - throws IOException { - if(!isOpen()) { + throws IOException { + if (!isOpen()) { throw new LogFileRetryableIOException("File closed " + file); } long length = position(); long expectedLength = length + (long) buffer.limit(); - if(expectedLength > maxFileSize) { + if (expectedLength > maxFileSize) { throw new LogFileRetryableIOException(expectedLength + " > " + maxFileSize); } - int offset = (int)length; + int offset = (int) length; Preconditions.checkState(offset >= 0, String.valueOf(offset)); // OP_RECORD + size + buffer - int recordLength = 1 + (int)Serialization.SIZE_OF_INT + buffer.limit(); + int recordLength = 1 + (int) Serialization.SIZE_OF_INT + buffer.limit(); usableSpace.decrement(recordLength); preallocate(recordLength); ByteBuffer toWrite = ByteBuffer.allocate(recordLength); @@ -323,15 +334,16 @@ public abstract class LogFile { * Sync the underlying log file to disk. Expensive call, * should be used only on commits. If a sync has already happened after * the last commit, this method is a no-op + * * @throws IOException * @throws LogFileRetryableIOException - if this log file is closed. */ synchronized void sync() throws IOException { if (!fsyncPerTransaction && !dirty) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug( - "No events written to file, " + getFile().toString() + - " in last " + fsyncInterval + " or since last commit."); + "No events written to file, " + getFile().toString() + + " in last " + fsyncInterval + " or since last commit."); } return; } @@ -346,27 +358,29 @@ public abstract class LogFile { } } - protected boolean isOpen() { return open; } + protected RandomAccessFile getFileHandle() { return writeFileHandle; } + protected FileChannel getFileChannel() { return writeFileChannel; } + synchronized void close() { - if(open) { + if (open) { open = false; if (!fsyncPerTransaction) { // Shutdown the executor before attempting to close. - if(syncExecutor != null) { + if (syncExecutor != null) { // No need to wait for it to shutdown. syncExecutor.shutdown(); } } - if(writeFileChannel.isOpen()) { + if (writeFileChannel.isOpen()) { LOG.info("Closing " + file); try { writeFileChannel.force(true); @@ -381,9 +395,10 @@ public abstract class LogFile { } } } + protected void preallocate(int size) throws IOException { long position = position(); - if(position + size > getFileChannel().size()) { + if (position + size > getFileChannel().size()) { LOG.debug("Preallocating at position " + position); synchronized (FILL) { FILL.position(0); @@ -404,7 +419,7 @@ public abstract class LogFile { public OperationRecordUpdater(File file) throws FileNotFoundException { Preconditions.checkState(file.exists(), "File to update, " + - file.toString() + " does not exist."); + file.toString() + " does not exist."); this.file = file; fileHandle = new RandomAccessFile(file, "rw"); } @@ -417,10 +432,10 @@ public abstract class LogFile { fileHandle.seek(offset); byte byteRead = fileHandle.readByte(); Preconditions.checkState(byteRead == OP_RECORD || byteRead == OP_NOOP, - "Expected to read a record but the byte read indicates EOF"); + "Expected to read a record but the byte read indicates EOF"); fileHandle.seek(offset); LOG.info("Marking event as " + OP_NOOP + " at " + offset + " for file " + - file.toString()); + file.toString()); fileHandle.writeByte(OP_NOOP); } @@ -430,20 +445,21 @@ public abstract class LogFile { fileHandle.close(); } catch (IOException e) { LOG.error("Could not close file handle to file " + - fileHandle.toString(), e); + fileHandle.toString(), e); } } } - static abstract class RandomReader { + abstract static class RandomReader { private final File file; private final BlockingQueue<RandomAccessFile> readFileHandles = new ArrayBlockingQueue<RandomAccessFile>(50, true); private final KeyProvider encryptionKeyProvider; private final boolean fsyncPerTransaction; private volatile boolean open; + public RandomReader(File file, @Nullable KeyProvider - encryptionKeyProvider, boolean fsyncPerTransaction) + encryptionKeyProvider, boolean fsyncPerTransaction) throws IOException { this.file = file; this.encryptionKeyProvider = encryptionKeyProvider; @@ -466,31 +482,31 @@ public abstract class LogFile { } FlumeEvent get(int offset) throws IOException, InterruptedException, - CorruptEventException, NoopRecordException { + CorruptEventException, NoopRecordException { Preconditions.checkState(open, "File closed"); RandomAccessFile fileHandle = checkOut(); boolean error = true; try { fileHandle.seek(offset); byte operation = fileHandle.readByte(); - if(operation == OP_NOOP) { + if (operation == OP_NOOP) { throw new NoopRecordException("No op record found. Corrupt record " + - "may have been repaired by File Channel Integrity tool"); + "may have been repaired by File Channel Integrity tool"); } if (operation != OP_RECORD) { throw new CorruptEventException( - "Operation code is invalid. File " + - "is corrupt. Please run File Channel Integrity tool."); + "Operation code is invalid. File " + + "is corrupt. Please run File Channel Integrity tool."); } TransactionEventRecord record = doGet(fileHandle); - if(!(record instanceof Put)) { + if (!(record instanceof Put)) { Preconditions.checkState(false, "Record is " + record.getClass().getSimpleName()); } error = false; - return ((Put)record).getEvent(); + return ((Put) record).getEvent(); } finally { - if(error) { + if (error) { close(fileHandle, file); } else { checkIn(fileHandle); @@ -499,12 +515,12 @@ public abstract class LogFile { } synchronized void close() { - if(open) { + if (open) { open = false; LOG.info("Closing RandomReader " + file); List<RandomAccessFile> fileHandles = Lists.newArrayList(); - while(readFileHandles.drainTo(fileHandles) > 0) { - for(RandomAccessFile fileHandle : fileHandles) { + while (readFileHandles.drainTo(fileHandles) > 0) { + for (RandomAccessFile fileHandle : fileHandles) { synchronized (fileHandle) { try { fileHandle.close(); @@ -528,7 +544,7 @@ public abstract class LogFile { } private void checkIn(RandomAccessFile fileHandle) { - if(!readFileHandles.offer(fileHandle)) { + if (!readFileHandles.offer(fileHandle)) { close(fileHandle, file); } } @@ -536,19 +552,20 @@ public abstract class LogFile { private RandomAccessFile checkOut() throws IOException, InterruptedException { RandomAccessFile fileHandle = readFileHandles.poll(); - if(fileHandle != null) { + if (fileHandle != null) { return fileHandle; } int remaining = readFileHandles.remainingCapacity(); - if(remaining > 0) { + if (remaining > 0) { LOG.info("Opening " + file + " for read, remaining number of file " + - "handles available for reads of this file is " + remaining); + "handles available for reads of this file is " + remaining); return open(); } return readFileHandles.take(); } + private static void close(RandomAccessFile fileHandle, File file) { - if(fileHandle != null) { + if (fileHandle != null) { try { fileHandle.close(); } catch (IOException e) { @@ -558,7 +575,7 @@ public abstract class LogFile { } } - public static abstract class SequentialReader { + public abstract static class SequentialReader { private final RandomAccessFile fileHandle; private final FileChannel fileChannel; @@ -573,8 +590,9 @@ public abstract class LogFile { /** * Construct a Sequential Log Reader object + * * @param file - * @throws IOException if an I/O error occurs + * @throws IOException if an I/O error occurs * @throws EOFException if the file is empty */ SequentialReader(File file, @Nullable KeyProvider encryptionKeyProvider) @@ -584,6 +602,7 @@ public abstract class LogFile { fileHandle = new RandomAccessFile(file, "r"); fileChannel = fileHandle.getChannel(); } + abstract LogRecord doNext(int offset) throws IOException, CorruptEventException; abstract int getVersion(); @@ -591,50 +610,57 @@ public abstract class LogFile { protected void setLastCheckpointPosition(long lastCheckpointPosition) { this.lastCheckpointPosition = lastCheckpointPosition; } + protected void setLastCheckpointWriteOrderID(long lastCheckpointWriteOrderID) { this.lastCheckpointWriteOrderID = lastCheckpointWriteOrderID; } + protected void setPreviousCheckpointPosition( - long backupCheckpointPosition) { + long backupCheckpointPosition) { this.backupCheckpointPosition = backupCheckpointPosition; } + protected void setPreviousCheckpointWriteOrderID( - long backupCheckpointWriteOrderID) { + long backupCheckpointWriteOrderID) { this.backupCheckpointWriteOrderID = backupCheckpointWriteOrderID; } + protected void setLogFileID(int logFileID) { this.logFileID = logFileID; Preconditions.checkArgument(logFileID >= 0, "LogFileID is not positive: " + Integer.toHexString(logFileID)); } + protected KeyProvider getKeyProvider() { return encryptionKeyProvider; } + protected RandomAccessFile getFileHandle() { return fileHandle; } + int getLogFileID() { return logFileID; } void skipToLastCheckpointPosition(long checkpointWriteOrderID) - throws IOException { + throws IOException { if (lastCheckpointPosition > 0L) { long position = 0; if (lastCheckpointWriteOrderID <= checkpointWriteOrderID) { position = lastCheckpointPosition; } else if (backupCheckpointWriteOrderID <= checkpointWriteOrderID - && backupCheckpointPosition > 0) { + && backupCheckpointPosition > 0) { position = backupCheckpointPosition; } fileChannel.position(position); LOG.info("fast-forward to checkpoint position: " + position); } else { LOG.info("Checkpoint for file(" + file.getAbsolutePath() + ") " - + "is: " + lastCheckpointWriteOrderID + ", which is beyond the " - + "requested checkpoint time: " + checkpointWriteOrderID - + " and position " + lastCheckpointPosition); + + "is: " + lastCheckpointWriteOrderID + ", which is beyond the " + + "requested checkpoint time: " + checkpointWriteOrderID + + " and position " + lastCheckpointPosition); } } @@ -644,8 +670,8 @@ public abstract class LogFile { long position = fileChannel.position(); if (position > FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE) { LOG.info("File position exceeds the threshold: " - + FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE - + ", position: " + position); + + FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE + + ", position: " + position); } offset = (int) position; Preconditions.checkState(offset >= 0); @@ -658,21 +684,21 @@ public abstract class LogFile { return null; } else if (operation == OP_NOOP) { LOG.info("No op event found in file: " + file.toString() + - " at " + offset + ". Skipping event."); + " at " + offset + ". Skipping event."); skipRecord(fileHandle, offset + 1); offset = (int) fileHandle.getFilePointer(); continue; } else { LOG.error("Encountered non op-record at " + offset + " " + - Integer.toHexString(operation) + " in " + file); + Integer.toHexString(operation) + " in " + file); return null; } } - if(offset >= fileHandle.length()) { + if (offset >= fileHandle.length()) { return null; } return doNext(offset); - } catch(EOFException e) { + } catch (EOFException e) { return null; } catch (IOException e) { throw new IOException("Unable to read next Transaction from log file " + @@ -683,33 +709,36 @@ public abstract class LogFile { public long getPosition() throws IOException { return fileChannel.position(); } + public void close() { - if(fileHandle != null) { + if (fileHandle != null) { try { fileHandle.close(); - } catch (IOException e) {} + } catch (IOException e) { + } } } } + protected static void writeDelimitedBuffer(ByteBuffer output, ByteBuffer buffer) throws IOException { output.putInt(buffer.limit()); output.put(buffer); } + protected static byte[] readDelimitedBuffer(RandomAccessFile fileHandle) throws IOException, CorruptEventException { int length = fileHandle.readInt(); if (length < 0) { - throw new CorruptEventException("Length of event is: " + String.valueOf - (length) + ". Event must have length >= 0. Possible corruption of " + - "data or partial fsync."); + throw new CorruptEventException("Length of event is: " + String.valueOf(length) + + ". Event must have length >= 0. Possible corruption of data or partial fsync."); } byte[] buffer = new byte[length]; try { fileHandle.readFully(buffer); } catch (EOFException ex) { throw new CorruptEventException("Remaining data in file less than " + - "expected size of event.", ex); + "expected size of event.", ex); } return buffer; }
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java index 7d7fd85..f2fcad6 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java @@ -18,24 +18,23 @@ */ package org.apache.flume.channel.file; +import com.google.common.base.Preconditions; +import org.apache.flume.channel.file.encryption.KeyProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.security.Key; -import javax.annotation.Nullable; - -import org.apache.flume.channel.file.encryption.KeyProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - @SuppressWarnings("deprecation") class LogFileFactory { private static final Logger LOGGER = LoggerFactory.getLogger(LogFileFactory.class); + private LogFileFactory() {} static LogFile.MetaDataWriter getMetaDataWriter(File file, int logFileID) @@ -43,21 +42,21 @@ class LogFileFactory { RandomAccessFile logFile = null; try { File metaDataFile = Serialization.getMetaDataFile(file); - if(metaDataFile.exists()) { + if (metaDataFile.exists()) { return new LogFileV3.MetaDataWriter(file, logFileID); } logFile = new RandomAccessFile(file, "r"); int version = logFile.readInt(); - if(Serialization.VERSION_2 == version) { + if (Serialization.VERSION_2 == version) { return new LogFileV2.MetaDataWriter(file, logFileID); } throw new IOException("File " + file + " has bad version " + Integer.toHexString(version)); } finally { - if(logFile != null) { + if (logFile != null) { try { logFile.close(); - } catch(IOException e) { + } catch (IOException e) { LOGGER.warn("Unable to close " + file, e); } } @@ -65,13 +64,13 @@ class LogFileFactory { } static LogFile.Writer getWriter(File file, int logFileID, - long maxFileSize, @Nullable Key encryptionKey, - @Nullable String encryptionKeyAlias, - @Nullable String encryptionCipherProvider, - long usableSpaceRefreshInterval, boolean fsyncPerTransaction, - int fsyncInterval) throws IOException { - Preconditions.checkState(!file.exists(), "File already exists " + - file.getAbsolutePath()); + long maxFileSize, @Nullable Key encryptionKey, + @Nullable String encryptionKeyAlias, + @Nullable String encryptionCipherProvider, + long usableSpaceRefreshInterval, boolean fsyncPerTransaction, + int fsyncInterval) throws IOException { + Preconditions.checkState(!file.exists(), "File already exists " + + file.getAbsolutePath()); Preconditions.checkState(file.createNewFile(), "File could not be created " + file.getAbsolutePath()); return new LogFileV3.Writer(file, logFileID, maxFileSize, encryptionKey, @@ -80,28 +79,29 @@ class LogFileFactory { } static LogFile.RandomReader getRandomReader(File file, - @Nullable KeyProvider encryptionKeyProvider, boolean fsyncPerTransaction) + @Nullable KeyProvider encryptionKeyProvider, + boolean fsyncPerTransaction) throws IOException { RandomAccessFile logFile = new RandomAccessFile(file, "r"); try { File metaDataFile = Serialization.getMetaDataFile(file); // either this is a rr for a just created file or // the metadata file exists and as such it's V3 - if(logFile.length() == 0L || metaDataFile.exists()) { + if (logFile.length() == 0L || metaDataFile.exists()) { return new LogFileV3.RandomReader(file, encryptionKeyProvider, - fsyncPerTransaction); + fsyncPerTransaction); } int version = logFile.readInt(); - if(Serialization.VERSION_2 == version) { + if (Serialization.VERSION_2 == version) { return new LogFileV2.RandomReader(file); } throw new IOException("File " + file + " has bad version " + Integer.toHexString(version)); } finally { - if(logFile != null) { + if (logFile != null) { try { logFile.close(); - } catch(IOException e) { + } catch (IOException e) { LOGGER.warn("Unable to close " + file, e); } } @@ -109,7 +109,8 @@ class LogFileFactory { } static LogFile.SequentialReader getSequentialReader(File file, - @Nullable KeyProvider encryptionKeyProvider, boolean fsyncPerTransaction) + @Nullable KeyProvider encryptionKeyProvider, + boolean fsyncPerTransaction) throws IOException { RandomAccessFile logFile = null; try { @@ -134,27 +135,27 @@ class LogFileFactory { hasMeta = true; } else { throw new IOException("Renaming of " + tempMetadataFile.getName() - + " to " + metaDataFile.getName() + " failed"); + + " to " + metaDataFile.getName() + " failed"); } } else if (oldMetadataFile.exists()) { if (oldMetadataFile.renameTo(metaDataFile)) { hasMeta = true; } else { throw new IOException("Renaming of " + oldMetadataFile.getName() - + " to " + metaDataFile.getName() + " failed"); + + " to " + metaDataFile.getName() + " failed"); } } if (hasMeta) { // Now the metadata file has been found, delete old or temp files // so it does not interfere with normal operation. - if(oldMetadataFile.exists()) { + if (oldMetadataFile.exists()) { oldMetadataFile.delete(); } - if(tempMetadataFile.exists()) { + if (tempMetadataFile.exists()) { tempMetadataFile.delete(); } - if(metaDataFile.length() == 0L) { - if(file.length() != 0L) { + if (metaDataFile.length() == 0L) { + if (file.length() != 0L) { String msg = String.format("MetaData file %s is empty, but log %s" + " is of size %d", metaDataFile, file, file.length()); throw new IllegalStateException(msg); @@ -163,20 +164,20 @@ class LogFileFactory { metaDataFile)); } return new LogFileV3.SequentialReader(file, encryptionKeyProvider, - fsyncPerTransaction); + fsyncPerTransaction); } logFile = new RandomAccessFile(file, "r"); int version = logFile.readInt(); - if(Serialization.VERSION_2 == version) { + if (Serialization.VERSION_2 == version) { return new LogFileV2.SequentialReader(file); } throw new IOException("File " + file + " has bad version " + Integer.toHexString(version)); } finally { - if(logFile != null) { + if (logFile != null) { try { logFile.close(); - } catch(IOException e) { + } catch (IOException e) { LOGGER.warn("Unable to close " + file, e); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java index 9447652..b0377ab 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java @@ -22,12 +22,15 @@ import java.io.IOException; public class LogFileRetryableIOException extends IOException { private static final long serialVersionUID = -2747112999806160431L; + public LogFileRetryableIOException() { super(); } + public LogFileRetryableIOException(String msg) { super(msg); } + public LogFileRetryableIOException(String msg, Throwable t) { super(msg, t); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java index bb25e95..62b8cb9 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java @@ -65,7 +65,7 @@ class LogFileV2 extends LogFile { + ", logWriteOrderID: " + getLastCheckpointWriteOrderID()); error = false; } finally { - if(error) { + if (error) { close(); } } @@ -108,6 +108,7 @@ class LogFileV2 extends LogFile { getFileChannel().force(true); } + @Override int getVersion() { return Serialization.VERSION_2; @@ -115,29 +116,27 @@ class LogFileV2 extends LogFile { } static class RandomReader extends LogFile.RandomReader { - RandomReader(File file) - throws IOException { + RandomReader(File file) throws IOException { super(file, null, true); } + @Override int getVersion() { return Serialization.VERSION_2; } + @Override - protected TransactionEventRecord doGet(RandomAccessFile fileHandle) - throws IOException { + protected TransactionEventRecord doGet(RandomAccessFile fileHandle) throws IOException { return TransactionEventRecord.fromDataInputV2(fileHandle); } } static class SequentialReader extends LogFile.SequentialReader { - - SequentialReader(File file) - throws EOFException, IOException { + SequentialReader(File file) throws EOFException, IOException { super(file, null); RandomAccessFile fileHandle = getFileHandle(); int version = fileHandle.readInt(); - if(version != getVersion()) { + if (version != getVersion()) { throw new IOException("Version is " + Integer.toHexString(version) + " expected " + Integer.toHexString(getVersion()) + " file: " + file.getCanonicalPath()); @@ -146,10 +145,12 @@ class LogFileV2 extends LogFile { setLastCheckpointPosition(fileHandle.readLong()); setLastCheckpointWriteOrderID(fileHandle.readLong()); } + @Override public int getVersion() { return Serialization.VERSION_2; } + @Override LogRecord doNext(int offset) throws IOException { TransactionEventRecord event = http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java index 9b0ef93..b459947 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java @@ -21,7 +21,6 @@ package org.apache.flume.channel.file; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import com.google.protobuf.GeneratedMessage; -import org.apache.flume.Transaction; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.channel.file.encryption.CipherProvider; @@ -58,13 +57,14 @@ public class LogFileV3 extends LogFile { static class MetaDataWriter extends LogFile.MetaDataWriter { private ProtosFactory.LogFileMetaData logFileMetaData; private final File metaDataFile; + protected MetaDataWriter(File logFile, int logFileID) throws IOException { super(logFile, logFileID); metaDataFile = Serialization.getMetaDataFile(logFile); MetaDataReader metaDataReader = new MetaDataReader(logFile, logFileID); logFileMetaData = metaDataReader.read(); int version = logFileMetaData.getVersion(); - if(version != getVersion()) { + if (version != getVersion()) { throw new IOException("Version is " + Integer.toHexString(version) + " expected " + Integer.toHexString(getVersion()) + " file: " + logFile); @@ -90,9 +90,9 @@ public class LogFileV3 extends LogFile { * would be possible to recover from a backup. */ metaDataBuilder.setBackupCheckpointPosition(logFileMetaData - .getCheckpointPosition()); + .getCheckpointPosition()); metaDataBuilder.setBackupCheckpointWriteOrderID(logFileMetaData - .getCheckpointWriteOrderID()); + .getCheckpointWriteOrderID()); logFileMetaData = metaDataBuilder.build(); writeDelimitedTo(logFileMetaData, metaDataFile); } @@ -102,17 +102,19 @@ public class LogFileV3 extends LogFile { private final File logFile; private final File metaDataFile; private final int logFileID; + protected MetaDataReader(File logFile, int logFileID) throws IOException { this.logFile = logFile; metaDataFile = Serialization.getMetaDataFile(logFile); this.logFileID = logFileID; } + ProtosFactory.LogFileMetaData read() throws IOException { FileInputStream inputStream = new FileInputStream(metaDataFile); try { ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull( - ProtosFactory.LogFileMetaData. - parseDelimitedFrom(inputStream), "Metadata cannot be null"); + ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream), + "Metadata cannot be null"); if (metaData.getLogFileID() != logFileID) { throw new IOException("The file id of log file: " + logFile + " is different from expected " @@ -123,7 +125,7 @@ public class LogFileV3 extends LogFile { } finally { try { inputStream.close(); - } catch(IOException e) { + } catch (IOException e) { LOGGER.warn("Unable to close " + metaDataFile, e); } } @@ -133,13 +135,14 @@ public class LogFileV3 extends LogFile { /** * Writes a GeneratedMessage to a temp file, synchronizes it to disk * and then renames the file over file. - * @param msg GeneratedMessage to write to the file + * + * @param msg GeneratedMessage to write to the file * @param file destination file * @throws IOException if a write error occurs or the File.renameTo - * method returns false meaning the file could not be overwritten. + * method returns false meaning the file could not be overwritten. */ public static void writeDelimitedTo(GeneratedMessage msg, File file) - throws IOException { + throws IOException { File tmp = Serialization.getMetaDataTempFile(file); FileOutputStream outputStream = new FileOutputStream(tmp); boolean closed = false; @@ -148,26 +151,26 @@ public class LogFileV3 extends LogFile { outputStream.getChannel().force(true); outputStream.close(); closed = true; - if(!tmp.renameTo(file)) { + if (!tmp.renameTo(file)) { //Some platforms don't support moving over an existing file. //So: //log.meta -> log.meta.old //log.meta.tmp -> log.meta //delete log.meta.old File oldFile = Serialization.getOldMetaDataFile(file); - if(!file.renameTo(oldFile)){ + if (!file.renameTo(oldFile)) { throw new IOException("Unable to rename " + file + " to " + oldFile); } - if(!tmp.renameTo(file)) { + if (!tmp.renameTo(file)) { throw new IOException("Unable to rename " + tmp + " over " + file); } oldFile.delete(); } } finally { - if(!closed) { + if (!closed) { try { outputStream.close(); - } catch(IOException e) { + } catch (IOException e) { LOGGER.warn("Unable to close " + tmp, e); } } @@ -177,17 +180,17 @@ public class LogFileV3 extends LogFile { static class Writer extends LogFile.Writer { Writer(File file, int logFileID, long maxFileSize, - @Nullable Key encryptionKey, - @Nullable String encryptionKeyAlias, - @Nullable String encryptionCipherProvider, - long usableSpaceRefreshInterval, boolean fsyncPerTransaction, - int fsyncInterval) throws IOException { - super(file, logFileID, maxFileSize, CipherProviderFactory. - getEncrypter(encryptionCipherProvider, encryptionKey), - usableSpaceRefreshInterval, fsyncPerTransaction, fsyncInterval); + @Nullable Key encryptionKey, + @Nullable String encryptionKeyAlias, + @Nullable String encryptionCipherProvider, + long usableSpaceRefreshInterval, boolean fsyncPerTransaction, + int fsyncInterval) throws IOException { + super(file, logFileID, maxFileSize, + CipherProviderFactory.getEncrypter(encryptionCipherProvider, encryptionKey), + usableSpaceRefreshInterval, fsyncPerTransaction, fsyncInterval); ProtosFactory.LogFileMetaData.Builder metaDataBuilder = ProtosFactory.LogFileMetaData.newBuilder(); - if(encryptionKey != null) { + if (encryptionKey != null) { Preconditions.checkNotNull(encryptionKeyAlias, "encryptionKeyAlias"); Preconditions.checkNotNull(encryptionCipherProvider, "encryptionCipherProvider"); @@ -208,6 +211,7 @@ public class LogFileV3 extends LogFile { File metaDataFile = Serialization.getMetaDataFile(file); writeDelimitedTo(metaDataBuilder.build(), metaDataFile); } + @Override int getVersion() { return Serialization.VERSION_3; @@ -221,28 +225,29 @@ public class LogFileV3 extends LogFile { private volatile String cipherProvider; private volatile byte[] parameters; private BlockingQueue<CipherProvider.Decryptor> decryptors = - new LinkedBlockingDeque<CipherProvider.Decryptor>(); + new LinkedBlockingDeque<CipherProvider.Decryptor>(); RandomReader(File file, @Nullable KeyProvider encryptionKeyProvider, - boolean fsyncPerTransaction) throws IOException { + boolean fsyncPerTransaction) throws IOException { super(file, encryptionKeyProvider, fsyncPerTransaction); } + private void initialize() throws IOException { File metaDataFile = Serialization.getMetaDataFile(getFile()); FileInputStream inputStream = new FileInputStream(metaDataFile); try { - ProtosFactory.LogFileMetaData metaData = - Preconditions.checkNotNull(ProtosFactory.LogFileMetaData. - parseDelimitedFrom(inputStream), "MetaData cannot be null"); + ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull( + ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream), + "MetaData cannot be null"); int version = metaData.getVersion(); - if(version != getVersion()) { + if (version != getVersion()) { throw new IOException("Version is " + Integer.toHexString(version) + " expected " + Integer.toHexString(getVersion()) + " file: " + getFile().getCanonicalPath()); } encryptionEnabled = false; - if(metaData.hasEncryption()) { - if(getKeyProvider() == null) { + if (metaData.hasEncryption()) { + if (getKeyProvider() == null) { throw new IllegalStateException("Data file is encrypted but no " + " provider was specified"); } @@ -255,23 +260,26 @@ public class LogFileV3 extends LogFile { } finally { try { inputStream.close(); - } catch(IOException e) { + } catch (IOException e) { LOGGER.warn("Unable to close " + metaDataFile, e); } } } + private CipherProvider.Decryptor getDecryptor() { CipherProvider.Decryptor decryptor = decryptors.poll(); - if(decryptor == null) { + if (decryptor == null) { decryptor = CipherProviderFactory.getDecrypter(cipherProvider, key, parameters); } return decryptor; } + @Override int getVersion() { return Serialization.VERSION_3; } + @Override protected TransactionEventRecord doGet(RandomAccessFile fileHandle) throws IOException, CorruptEventException { @@ -279,7 +287,7 @@ public class LogFileV3 extends LogFile { // empty. As such we wait to initialize until there is some // data before we we initialize synchronized (this) { - if(!initialized) { + if (!initialized) { initialized = true; initialize(); } @@ -288,18 +296,17 @@ public class LogFileV3 extends LogFile { CipherProvider.Decryptor decryptor = null; try { byte[] buffer = readDelimitedBuffer(fileHandle); - if(encryptionEnabled) { + if (encryptionEnabled) { decryptor = getDecryptor(); buffer = decryptor.decrypt(buffer); } - TransactionEventRecord event = TransactionEventRecord. - fromByteArray(buffer); + TransactionEventRecord event = TransactionEventRecord.fromByteArray(buffer); success = true; return event; - } catch(DecryptionFailureException ex) { + } catch (DecryptionFailureException ex) { throw new CorruptEventException("Error decrypting event", ex); } finally { - if(success && encryptionEnabled && decryptor != null) { + if (success && encryptionEnabled && decryptor != null) { decryptors.offer(decryptor); } } @@ -309,9 +316,10 @@ public class LogFileV3 extends LogFile { public static class SequentialReader extends LogFile.SequentialReader { private CipherProvider.Decryptor decryptor; private final boolean fsyncPerTransaction; + public SequentialReader(File file, @Nullable KeyProvider - encryptionKeyProvider, boolean fsyncPerTransaction) throws EOFException, - IOException { + encryptionKeyProvider, boolean fsyncPerTransaction) throws EOFException, + IOException { super(file, encryptionKeyProvider); this.fsyncPerTransaction = fsyncPerTransaction; File metaDataFile = Serialization.getMetaDataFile(file); @@ -321,32 +329,31 @@ public class LogFileV3 extends LogFile { ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream), "MetaData cannot be null"); int version = metaData.getVersion(); - if(version != getVersion()) { + if (version != getVersion()) { throw new IOException("Version is " + Integer.toHexString(version) + " expected " + Integer.toHexString(getVersion()) + " file: " + file.getCanonicalPath()); } - if(metaData.hasEncryption()) { - if(getKeyProvider() == null) { + if (metaData.hasEncryption()) { + if (getKeyProvider() == null) { throw new IllegalStateException("Data file is encrypted but no " + " provider was specified"); } ProtosFactory.LogFileEncryption encryption = metaData.getEncryption(); Key key = getKeyProvider().getKey(encryption.getKeyAlias()); - decryptor = CipherProviderFactory. - getDecrypter(encryption.getCipherProvider(), key, - encryption.getParameters().toByteArray()); + decryptor = CipherProviderFactory.getDecrypter( + encryption.getCipherProvider(), key, encryption.getParameters().toByteArray()); } setLogFileID(metaData.getLogFileID()); setLastCheckpointPosition(metaData.getCheckpointPosition()); setLastCheckpointWriteOrderID(metaData.getCheckpointWriteOrderID()); setPreviousCheckpointPosition(metaData.getBackupCheckpointPosition()); setPreviousCheckpointWriteOrderID( - metaData.getBackupCheckpointWriteOrderID()); + metaData.getBackupCheckpointWriteOrderID()); } finally { try { inputStream.close(); - } catch(IOException e) { + } catch (IOException e) { LOGGER.warn("Unable to close " + metaDataFile, e); } } @@ -359,7 +366,7 @@ public class LogFileV3 extends LogFile { @Override LogRecord doNext(int offset) throws IOException, CorruptEventException, - DecryptionFailureException { + DecryptionFailureException { byte[] buffer = null; TransactionEventRecord event = null; try { @@ -370,7 +377,7 @@ public class LogFileV3 extends LogFile { event = TransactionEventRecord.fromByteArray(buffer); } catch (CorruptEventException ex) { LOGGER.warn("Corrupt file found. File id: log-" + this.getLogFileID(), - ex); + ex); // Return null so that replay handler thinks all events in this file // have been taken. if (!fsyncPerTransaction) { @@ -380,7 +387,7 @@ public class LogFileV3 extends LogFile { } catch (DecryptionFailureException ex) { if (!fsyncPerTransaction) { LOGGER.warn("Could not decrypt even read from channel. Skipping " + - "event.", ex); + "event.", ex); return null; } throw ex; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java index 19ad0d6..5a75627 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java @@ -20,15 +20,17 @@ package org.apache.flume.channel.file; import java.util.Arrays; - public class LogRecord implements Comparable<LogRecord> { - private int fileID, offset; + private int fileID; + private int offset; private TransactionEventRecord event; + public LogRecord(int fileID, int offset, TransactionEventRecord event) { this.fileID = fileID; this.offset = offset; this.event = event; } + public int getFileID() { return fileID; } @@ -41,20 +43,16 @@ public class LogRecord implements Comparable<LogRecord> { @Override public int compareTo(LogRecord o) { - int result = new Long(event.getLogWriteOrderID()) - .compareTo(o.getEvent().getLogWriteOrderID()); - if(result == 0) { + int result = new Long(event.getLogWriteOrderID()).compareTo(o.getEvent().getLogWriteOrderID()); + if (result == 0) { // oops we have hit a flume-1.2 bug. let's try and use the txid // to replay the events - result = new Long(event.getTransactionID()) - .compareTo(o.getEvent().getTransactionID()); - if(result == 0) { + result = new Long(event.getTransactionID()).compareTo(o.getEvent().getTransactionID()); + if (result == 0) { // events are within the same transaction. Basically we want commit // and rollback to come after take and put - Integer thisIndex = Arrays.binarySearch(replaySortOrder, - event.getRecordType()); - Integer thatIndex = Arrays.binarySearch(replaySortOrder, - o.getEvent().getRecordType()); + Integer thisIndex = Arrays.binarySearch(replaySortOrder, event.getRecordType()); + Integer thatIndex = Arrays.binarySearch(replaySortOrder, o.getEvent().getRecordType()); return thisIndex.compareTo(thatIndex); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java index d1498c2..48177d0 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java @@ -64,7 +64,7 @@ public class LogUtils { static List<File> getLogs(File logDir) { List<File> result = Lists.newArrayList(); File[] files = logDir.listFiles(); - if(files == null) { + if (files == null) { String msg = logDir + ".listFiles() returned null: "; msg += "File = " + logDir.isFile() + ", "; msg += "Exists = " + logDir.exists() + ", "; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java index dfcdd73..b74ff7b 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java @@ -19,19 +19,21 @@ package org.apache.flume.channel.file; class Pair<L,R> { - private final L left; private final R right; + Pair(L l, R r) { left = l; right = r; } + L getLeft() { return left; } R getRight() { return right; } + static <L, R> Pair<L, R> of(L left, R right) { return new Pair<L, R>(left, right); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java index f08f024..0a70a24 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java @@ -78,8 +78,8 @@ class Put extends TransactionEventRecord { Map<String, String> headers = event.getHeaders(); ProtosFactory.FlumeEventHeader.Builder headerBuilder = ProtosFactory.FlumeEventHeader.newBuilder(); - if(headers != null) { - for(String key : headers.keySet()) { + if (headers != null) { + for (String key : headers.keySet()) { String value = headers.get(key); headerBuilder.clear(); eventBuilder.addHeaders(headerBuilder.setKey(key) @@ -93,13 +93,12 @@ class Put extends TransactionEventRecord { putBuilder.build().writeDelimitedTo(out); } @Override - void readProtos(InputStream in) throws IOException, - CorruptEventException { - ProtosFactory.Put put = Preconditions.checkNotNull(ProtosFactory. - Put.parseDelimitedFrom(in), "Put cannot be null"); + void readProtos(InputStream in) throws IOException, CorruptEventException { + ProtosFactory.Put put = Preconditions.checkNotNull( + ProtosFactory.Put.parseDelimitedFrom(in), "Put cannot be null"); Map<String, String> headers = Maps.newHashMap(); ProtosFactory.FlumeEvent protosEvent = put.getEvent(); - for(ProtosFactory.FlumeEventHeader header : protosEvent.getHeadersList()) { + for (ProtosFactory.FlumeEventHeader header : protosEvent.getHeadersList()) { headers.put(header.getKey(), header.getValue()); } byte[] eventBody = protosEvent.getBody().toByteArray(); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java index a559503..662fd42 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java @@ -99,8 +99,8 @@ class ReplayHandler { } ReplayHandler(FlumeEventQueue queue, - @Nullable KeyProvider encryptionKeyProvider, - boolean fsyncPerTransaction) { + @Nullable KeyProvider encryptionKeyProvider, + boolean fsyncPerTransaction) { this.queue = queue; this.lastCheckpoint = queue.getLogWriteOrderID(); pendingTakes = Lists.newArrayList(); @@ -109,6 +109,7 @@ class ReplayHandler { this.encryptionKeyProvider = encryptionKeyProvider; this.fsyncPerTransaction = fsyncPerTransaction; } + /** * Replay logic from Flume1.2 which can be activated if the v2 logic * is failing on ol logs for some reason. @@ -165,9 +166,8 @@ class ReplayHandler { commitCount++; @SuppressWarnings("unchecked") Collection<FlumeEventPointer> pointers = - (Collection<FlumeEventPointer>) transactionMap.remove(trans); - if (((Commit) record).getType() - == TransactionEventRecord.Type.TAKE.get()) { + (Collection<FlumeEventPointer>) transactionMap.remove(trans); + if (((Commit) record).getType() == TransactionEventRecord.Type.TAKE.get()) { if (inflightTakes.containsKey(trans)) { if (pointers == null) { pointers = Sets.newHashSet(); @@ -185,8 +185,8 @@ class ReplayHandler { count += pointers.size(); } } else { - Preconditions.checkArgument(false, "Unknown record type: " - + Integer.toHexString(type)); + Preconditions.checkArgument(false, + "Unknown record type: " + Integer.toHexString(type)); } } else { @@ -196,8 +196,8 @@ class ReplayHandler { LOG.info("Replayed " + count + " from " + log); if (LOG.isDebugEnabled()) { LOG.debug("read: " + readCount + ", put: " + putCount + ", take: " - + takeCount + ", rollback: " + rollbackCount + ", commit: " - + commitCount + ", skipp: " + skipCount); + + takeCount + ", rollback: " + rollbackCount + ", commit: " + + commitCount + ", skipp: " + skipCount); } } catch (EOFException e) { LOG.warn("Hit EOF on " + log); @@ -262,21 +262,20 @@ class ReplayHandler { LOG.info("Replaying " + log); try { LogFile.SequentialReader reader = - LogFileFactory.getSequentialReader(log, encryptionKeyProvider, - fsyncPerTransaction); + LogFileFactory.getSequentialReader(log, encryptionKeyProvider, fsyncPerTransaction); reader.skipToLastCheckpointPosition(queue.getLogWriteOrderID()); Preconditions.checkState(!readers.containsKey(reader.getLogFileID()), "Readers " + readers + " already contains " + reader.getLogFileID()); readers.put(reader.getLogFileID(), reader); LogRecord logRecord = reader.next(); - if(logRecord == null) { + if (logRecord == null) { readers.remove(reader.getLogFileID()); reader.close(); } else { logRecordBuffer.add(logRecord); } - } catch(EOFException e) { + } catch (EOFException e) { LOG.warn("Ignoring " + log + " due to EOF", e); } } @@ -294,7 +293,7 @@ class ReplayHandler { writeOrderIDSeed = Math.max(writeOrderIDSeed, record.getLogWriteOrderID()); readCount++; - if(readCount % 10000 == 0 && readCount > 0) { + if (readCount % 10000 == 0 && readCount > 0) { LOG.info("read: " + readCount + ", put: " + putCount + ", take: " + takeCount + ", rollback: " + rollbackCount + ", commit: " + commitCount + ", skip: " + skipCount + ", eventCount:" + count); @@ -316,11 +315,11 @@ class ReplayHandler { commitCount++; @SuppressWarnings("unchecked") Collection<FlumeEventPointer> pointers = - (Collection<FlumeEventPointer>) transactionMap.remove(trans); + (Collection<FlumeEventPointer>) transactionMap.remove(trans); if (((Commit) record).getType() == TransactionEventRecord.Type.TAKE.get()) { if (inflightTakes.containsKey(trans)) { - if(pointers == null){ + if (pointers == null) { pointers = Sets.newHashSet(); } Set<Long> takes = inflightTakes.removeAll(trans); @@ -350,8 +349,8 @@ class ReplayHandler { } finally { TransactionIDOracle.setSeed(transactionIDSeed); WriteOrderOracle.setSeed(writeOrderIDSeed); - for(LogFile.SequentialReader reader : readers.values()) { - if(reader != null) { + for (LogFile.SequentialReader reader : readers.values()) { + if (reader != null) { reader.close(); } } @@ -378,11 +377,11 @@ class ReplayHandler { } private LogRecord next() throws IOException, CorruptEventException { LogRecord resultLogRecord = logRecordBuffer.poll(); - if(resultLogRecord != null) { + if (resultLogRecord != null) { // there is more log records to read LogFile.SequentialReader reader = readers.get(resultLogRecord.getFileID()); LogRecord nextLogRecord; - if((nextLogRecord = reader.next()) != null) { + if ((nextLogRecord = reader.next()) != null) { logRecordBuffer.add(nextLogRecord); } } @@ -391,7 +390,7 @@ class ReplayHandler { private void processCommit(short type, Collection<FlumeEventPointer> pointers) { if (type == TransactionEventRecord.Type.PUT.get()) { for (FlumeEventPointer pointer : pointers) { - if(!queue.addTail(pointer)) { + if (!queue.addTail(pointer)) { throw new IllegalStateException("Unable to add " + pointer + ". Queue depth = " + queue.getSize() + ", Capacity = " + queue.getCapacity()); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java index 335ad0b..2fca755 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java @@ -35,6 +35,7 @@ class Rollback extends TransactionEventRecord { Rollback(Long transactionID, Long logWriteOrderID) { super(transactionID, logWriteOrderID); } + @Override public void readFields(DataInput in) throws IOException { super.readFields(in); @@ -44,22 +45,26 @@ class Rollback extends TransactionEventRecord { public void write(DataOutput out) throws IOException { super.write(out); } + @Override void writeProtos(OutputStream out) throws IOException { ProtosFactory.Rollback.Builder rollbackBuilder = ProtosFactory.Rollback.newBuilder(); rollbackBuilder.build().writeDelimitedTo(out); } + @Override void readProtos(InputStream in) throws IOException { @SuppressWarnings("unused") - ProtosFactory.Rollback rollback = Preconditions.checkNotNull(ProtosFactory. - Rollback.parseDelimitedFrom(in), "Rollback cannot be null"); + ProtosFactory.Rollback rollback = Preconditions.checkNotNull( + ProtosFactory.Rollback.parseDelimitedFrom(in), "Rollback cannot be null"); } + @Override short getRecordType() { return Type.ROLLBACK.get(); } + @Override public String toString() { StringBuilder builder = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java index a6eda75..19303cc 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java @@ -46,14 +46,12 @@ public class Serialization { static final long SIZE_OF_INT = 4; static final int SIZE_OF_LONG = 8; - static final int VERSION_2 = 2; static final int VERSION_3 = 3; public static final String METADATA_FILENAME = ".meta"; public static final String METADATA_TMP_FILENAME = ".tmp"; - public static final String OLD_METADATA_FILENAME = METADATA_FILENAME + - ".old"; + public static final String OLD_METADATA_FILENAME = METADATA_FILENAME + ".old"; // 64 K buffer to copy and compress files. private static final int FILE_BUFFER_SIZE = 64 * 1024; @@ -63,12 +61,11 @@ public class Serialization { static File getMetaDataTempFile(File metaDataFile) { String metaDataFileName = metaDataFile.getName() + METADATA_TMP_FILENAME; return new File(metaDataFile.getParentFile(), metaDataFileName); - } + static File getMetaDataFile(File file) { String metaDataFileName = file.getName() + METADATA_FILENAME; return new File(file.getParentFile(), metaDataFileName); - } // Support platforms that cannot do atomic renames - FLUME-1699 @@ -79,19 +76,20 @@ public class Serialization { /** * Deletes all files in given directory. + * * @param checkpointDir - The directory whose files are to be deleted - * @param excludes - Names of files which should not be deleted from this - * directory. + * @param excludes - Names of files which should not be deleted from this + * directory. * @return - true if all files were successfully deleted, false otherwise. */ static boolean deleteAllFiles(File checkpointDir, - @Nullable Set<String> excludes) { + @Nullable Set<String> excludes) { if (!checkpointDir.isDirectory()) { return false; } File[] files = checkpointDir.listFiles(); - if(files == null) { + if (files == null) { return false; } StringBuilder builder; @@ -100,13 +98,13 @@ public class Serialization { } else { builder = new StringBuilder("Deleted the following files: "); } - if(excludes == null) { + if (excludes == null) { excludes = Collections.emptySet(); } for (File file : files) { - if(excludes.contains(file.getName())) { + if (excludes.contains(file.getName())) { LOG.info("Skipping " + file.getName() + " because it is in excludes " + - "set"); + "set"); continue; } if (!FileUtils.deleteQuietly(file)) { @@ -125,18 +123,19 @@ public class Serialization { /** * Copy a file using a 64K size buffer. This method will copy the file and * then fsync to disk + * * @param from File to copy - this file should exist - * @param to Destination file - this file should not exist + * @param to Destination file - this file should not exist * @return true if the copy was successful */ public static boolean copyFile(File from, File to) throws IOException { Preconditions.checkNotNull(from, "Source file is null, file copy failed."); Preconditions.checkNotNull(to, "Destination file is null, " + - "file copy failed."); + "file copy failed."); Preconditions.checkState(from.exists(), "Source file: " + from.toString() + - " does not exist."); + " does not exist."); Preconditions.checkState(!to.exists(), "Destination file: " - + to.toString() + " unexpectedly exists."); + + to.toString() + " unexpectedly exists."); BufferedInputStream in = null; RandomAccessFile out = null; //use a RandomAccessFile for easy fsync @@ -145,7 +144,7 @@ public class Serialization { out = new RandomAccessFile(to, "rw"); byte[] buf = new byte[FILE_BUFFER_SIZE]; int total = 0; - while(true) { + while (true) { int read = in.read(buf); if (read == -1) { break; @@ -155,11 +154,11 @@ public class Serialization { } out.getFD().sync(); Preconditions.checkState(total == from.length(), - "The size of the origin file and destination file are not equal."); + "The size of the origin file and destination file are not equal."); return true; } catch (Exception ex) { LOG.error("Error while attempting to copy " + from.toString() + " to " - + to.toString() + ".", ex); + + to.toString() + ".", ex); Throwables.propagate(ex); } finally { Throwable th = null; @@ -185,26 +184,26 @@ public class Serialization { } // Should never reach here. throw new IOException("Copying file: " + from.toString() + " to: " + to - .toString() + " may have failed."); + .toString() + " may have failed."); } /** * Compress file using Snappy + * * @param uncompressed File to compress - this file should exist - * @param compressed Compressed file - this file should not exist + * @param compressed Compressed file - this file should not exist * @return true if compression was successful */ public static boolean compressFile(File uncompressed, File compressed) - throws IOException { + throws IOException { Preconditions.checkNotNull(uncompressed, - "Source file is null, compression failed."); + "Source file is null, compression failed."); Preconditions.checkNotNull(compressed, - "Destination file is null, compression failed."); + "Destination file is null, compression failed."); Preconditions.checkState(uncompressed.exists(), "Source file: " + - uncompressed.toString() + " does not exist."); + uncompressed.toString() + " does not exist."); Preconditions.checkState(!compressed.exists(), - "Compressed file: " + compressed.toString() + " unexpectedly " + - "exists."); + "Compressed file: " + compressed.toString() + " unexpectedly " + "exists."); BufferedInputStream in = null; FileOutputStream out = null; @@ -215,7 +214,7 @@ public class Serialization { snappyOut = new SnappyOutputStream(out); byte[] buf = new byte[FILE_BUFFER_SIZE]; - while(true) { + while (true) { int read = in.read(buf); if (read == -1) { break; @@ -226,8 +225,7 @@ public class Serialization { return true; } catch (Exception ex) { LOG.error("Error while attempting to compress " + - uncompressed.toString() + " to " + compressed.toString() - + ".", ex); + uncompressed.toString() + " to " + compressed.toString() + ".", ex); Throwables.propagate(ex); } finally { Throwable th = null; @@ -253,26 +251,24 @@ public class Serialization { } // Should never reach here. throw new IOException("Copying file: " + uncompressed.toString() - + " to: " + compressed.toString() + " may have failed."); + + " to: " + compressed.toString() + " may have failed."); } /** * Decompress file using Snappy - * @param compressed File to compress - this file should exist + * + * @param compressed File to compress - this file should exist * @param decompressed Compressed file - this file should not exist * @return true if decompression was successful */ - public static boolean decompressFile(File compressed, File decompressed) - throws IOException { - Preconditions.checkNotNull(compressed, - "Source file is null, decompression failed."); + public static boolean decompressFile(File compressed, File decompressed) throws IOException { + Preconditions.checkNotNull(compressed, "Source file is null, decompression failed."); Preconditions.checkNotNull(decompressed, "Destination file is " + - "null, decompression failed."); + "null, decompression failed."); Preconditions.checkState(compressed.exists(), "Source file: " + - compressed.toString() + " does not exist."); + compressed.toString() + " does not exist."); Preconditions.checkState(!decompressed.exists(), - "Decompressed file: " + decompressed.toString() + - " unexpectedly exists."); + "Decompressed file: " + decompressed.toString() + " unexpectedly exists."); BufferedInputStream in = null; SnappyInputStream snappyIn = null; @@ -283,7 +279,7 @@ public class Serialization { out = new FileOutputStream(decompressed); byte[] buf = new byte[FILE_BUFFER_SIZE]; - while(true) { + while (true) { int read = snappyIn.read(buf); if (read == -1) { break; @@ -294,8 +290,8 @@ public class Serialization { return true; } catch (Exception ex) { LOG.error("Error while attempting to compress " + - compressed.toString() + " to " + decompressed.toString() + - ".", ex); + compressed.toString() + " to " + decompressed.toString() + + ".", ex); Throwables.propagate(ex); } finally { Throwable th = null; @@ -321,7 +317,7 @@ public class Serialization { } // Should never reach here. throw new IOException("Decompressing file: " + - compressed.toString() + " to: " + decompressed.toString() + - " may have failed."); + compressed.toString() + " to: " + decompressed.toString() + + " may have failed."); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java index 143143a..ee7fcc8 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java @@ -34,14 +34,17 @@ import com.google.common.base.Preconditions; class Take extends TransactionEventRecord { private int offset; private int fileID; + Take(Long transactionID, Long logWriteOrderID) { super(transactionID, logWriteOrderID); } + Take(Long transactionID, Long logWriteOrderID, int offset, int fileID) { this(transactionID, logWriteOrderID); this.offset = offset; this.fileID = fileID; } + int getOffset() { return offset; } @@ -70,17 +73,20 @@ class Take extends TransactionEventRecord { takeBuilder.setOffset(offset); takeBuilder.build().writeDelimitedTo(out); } + @Override void readProtos(InputStream in) throws IOException { - ProtosFactory.Take take = Preconditions.checkNotNull(ProtosFactory. - Take.parseDelimitedFrom(in), "Take cannot be null"); + ProtosFactory.Take take = Preconditions.checkNotNull( + ProtosFactory.Take.parseDelimitedFrom(in), "Take cannot be null"); fileID = take.getFileID(); offset = take.getOffset(); } + @Override short getRecordType() { return Type.TAKE.get(); } + @Override public String toString() { StringBuilder builder = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java index 1eb3f4f..0f7c3c8 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java @@ -91,13 +91,16 @@ public abstract class TransactionEventRecord implements Writable { COMMIT((short)4); private short id; + Type(short id) { this.id = id; } + public short get() { return id; } } + private static final ImmutableMap<Short, Constructor<? extends TransactionEventRecord>> TYPES; static { @@ -131,11 +134,11 @@ public abstract class TransactionEventRecord implements Writable { dataOutput.flush(); // TODO toByteArray does an unneeded copy return ByteBuffer.wrap(byteOutput.toByteArray()); - } catch(IOException e) { + } catch (IOException e) { // near impossible throw Throwables.propagate(e); } finally { - if(dataOutput != null) { + if (dataOutput != null) { try { dataOutput.close(); } catch (IOException e) { @@ -149,7 +152,7 @@ public abstract class TransactionEventRecord implements Writable { static TransactionEventRecord fromDataInputV2(DataInput in) throws IOException { int header = in.readInt(); - if(header != MAGIC_HEADER) { + if (header != MAGIC_HEADER) { throw new IOException("Header " + Integer.toHexString(header) + " is not the required value: " + Integer.toHexString(MAGIC_HEADER)); } @@ -176,10 +179,10 @@ public abstract class TransactionEventRecord implements Writable { ProtosFactory.TransactionEventFooter.newBuilder().build(); footer.writeDelimitedTo(byteOutput); return ByteBuffer.wrap(byteOutput.toByteArray()); - } catch(IOException e) { + } catch (IOException e) { throw Throwables.propagate(e); } finally { - if(byteOutput != null) { + if (byteOutput != null) { try { byteOutput.close(); } catch (IOException e) { @@ -194,23 +197,19 @@ public abstract class TransactionEventRecord implements Writable { throws IOException, CorruptEventException { ByteArrayInputStream in = new ByteArrayInputStream(buffer); try { - ProtosFactory.TransactionEventHeader header = Preconditions. - checkNotNull(ProtosFactory.TransactionEventHeader. - parseDelimitedFrom(in), "Header cannot be null"); + ProtosFactory.TransactionEventHeader header = Preconditions.checkNotNull( + ProtosFactory.TransactionEventHeader.parseDelimitedFrom(in), "Header cannot be null"); short type = (short)header.getType(); long transactionID = header.getTransactionID(); long writeOrderID = header.getWriteOrderID(); - TransactionEventRecord transactionEvent = - newRecordForType(type, transactionID, writeOrderID); + TransactionEventRecord transactionEvent = newRecordForType(type, transactionID, writeOrderID); transactionEvent.readProtos(in); @SuppressWarnings("unused") ProtosFactory.TransactionEventFooter footer = Preconditions.checkNotNull( - ProtosFactory.TransactionEventFooter. - parseDelimitedFrom(in), "Footer cannot be null"); + ProtosFactory.TransactionEventFooter.parseDelimitedFrom(in), "Footer cannot be null"); return transactionEvent; } catch (InvalidProtocolBufferException ex) { - throw new CorruptEventException( - "Could not parse event from data file.", ex); + throw new CorruptEventException("Could not parse event from data file.", ex); } finally { try { in.close(); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java index a9f6be6..12e5c7d 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java @@ -23,15 +23,17 @@ import java.util.concurrent.atomic.AtomicLong; public final class TransactionIDOracle { private TransactionIDOracle() {} + private static final AtomicLong TRANSACTION_ID = new AtomicLong(System.currentTimeMillis()); public static void setSeed(long highest) { long previous; - while(highest > (previous = TRANSACTION_ID.get())) { + while (highest > (previous = TRANSACTION_ID.get())) { TRANSACTION_ID.compareAndSet(previous, highest); } } + public static long next() { return TRANSACTION_ID.incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java index 69072db..2ebd42d 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java @@ -75,7 +75,7 @@ class WritableUtils { long tmp = i; while (tmp != 0) { tmp = tmp >> 8; - len--; + len--; } stream.writeByte((byte)len); @@ -92,8 +92,8 @@ class WritableUtils { /** * Reads a zero-compressed encoded long from input stream and returns it. * @param stream Binary input stream - * @throws java.io.IOException * @return deserialized long from stream. + * @throws java.io.IOException */ public static long readVLong(DataInput stream) throws IOException { byte firstByte = stream.readByte(); @@ -102,7 +102,7 @@ class WritableUtils { return firstByte; } long i = 0; - for (int idx = 0; idx < len-1; idx++) { + for (int idx = 0; idx < len - 1; idx++) { byte b = stream.readByte(); i = i << 8; i = i | (b & 0xFF); @@ -113,8 +113,8 @@ class WritableUtils { /** * Reads a zero-compressed encoded integer from input stream and returns it. * @param stream Binary input stream - * @throws java.io.IOException * @return deserialized integer from stream. + * @throws java.io.IOException */ public static int readVInt(DataInput stream) throws IOException { long n = readVLong(stream); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java index dbf1c1e..b26cbb4 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java @@ -23,15 +23,17 @@ import java.util.concurrent.atomic.AtomicLong; public final class WriteOrderOracle { private WriteOrderOracle() {} + private static final AtomicLong WRITER_ORDERER = new AtomicLong(System.currentTimeMillis()); public static void setSeed(long highest) { long previous; - while(highest > (previous = WRITER_ORDERER.get())) { + while (highest > (previous = WRITER_ORDERER.get())) { WRITER_ORDERER.compareAndSet(previous, highest); } } + public static long next() { return WRITER_ORDERER.incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java index 9ee4245..e1116d2 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java @@ -46,7 +46,8 @@ public class AESCTRNoPaddingProvider extends CipherProvider { } public static class EncryptorBuilder - extends CipherProvider.Encryptor.Builder<AESCTRNoPaddingEncryptor> { + extends CipherProvider.Encryptor.Builder<AESCTRNoPaddingEncryptor> { + @Override public AESCTRNoPaddingEncryptor build() { ByteBuffer buffer = ByteBuffer.allocate(16); @@ -58,9 +59,8 @@ public class AESCTRNoPaddingProvider extends CipherProvider { } } - public static class DecryptorBuilder - extends CipherProvider.Decryptor.Builder<AESCTRNoPaddingDecryptor> { + extends CipherProvider.Decryptor.Builder<AESCTRNoPaddingDecryptor> { @Override public AESCTRNoPaddingDecryptor build() { return new AESCTRNoPaddingDecryptor(key, parameters); @@ -70,18 +70,22 @@ public class AESCTRNoPaddingProvider extends CipherProvider { private static class AESCTRNoPaddingEncryptor extends Encryptor { private byte[] parameters; private Cipher cipher; + private AESCTRNoPaddingEncryptor(Key key, byte[] parameters) { this.parameters = parameters; cipher = getCipher(key, Cipher.ENCRYPT_MODE, parameters); } + @Override public byte[] getParameters() { return parameters; } + @Override public String getCodec() { return TYPE; } + @Override public byte[] encrypt(byte[] clearText) { return doFinal(cipher, clearText); @@ -90,21 +94,23 @@ public class AESCTRNoPaddingProvider extends CipherProvider { private static class AESCTRNoPaddingDecryptor extends Decryptor { private Cipher cipher; + private AESCTRNoPaddingDecryptor(Key key, byte[] parameters) { cipher = getCipher(key, Cipher.DECRYPT_MODE, parameters); } + @Override public byte[] decrypt(byte[] cipherText) { return doFinal(cipher, cipherText); } + @Override public String getCodec() { return TYPE; } } - private static byte[] doFinal(Cipher cipher, byte[] input) - throws DecryptionFailureException{ + private static byte[] doFinal(Cipher cipher, byte[] input) throws DecryptionFailureException { try { return cipher.doFinal(input); } catch (Exception e) {
