Repository: flume Updated Branches: refs/heads/trunk f15f20785 -> 69fd6b3ad
FLUME-2401. Optionally compress backup checkpoint. (Abraham Fine via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/69fd6b3a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/69fd6b3a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/69fd6b3a Branch: refs/heads/trunk Commit: 69fd6b3ad5e5b9ae6f1293b3d8e57ed57fd6701c Parents: f15f207 Author: Hari Shreedharan <[email protected]> Authored: Tue Jul 8 17:20:36 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Tue Jul 8 17:20:36 2014 -0700 ---------------------------------------------------------------------- flume-ng-channels/flume-file-channel/pom.xml | 5 + .../file/EventQueueBackingStoreFactory.java | 15 +- .../file/EventQueueBackingStoreFile.java | 28 +++- .../file/EventQueueBackingStoreFileV3.java | 8 +- .../apache/flume/channel/file/FileChannel.java | 7 + .../channel/file/FileChannelConfiguration.java | 5 + .../java/org/apache/flume/channel/file/Log.java | 31 ++-- .../flume/channel/file/Serialization.java | 146 ++++++++++++++++++- .../flume/channel/file/TestFileChannelBase.java | 5 + .../channel/file/TestFileChannelRestart.java | 98 ++++++++++++- pom.xml | 6 + 11 files changed, 323 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/pom.xml b/flume-ng-channels/flume-file-channel/pom.xml index 3113938..7b8114c 100644 --- a/flume-ng-channels/flume-file-channel/pom.xml +++ b/flume-ng-channels/flume-file-channel/pom.xml @@ -107,6 +107,11 @@ <artifactId>mapdb</artifactId> </dependency> + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </dependency> + </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java index 07a3781..456df34 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java @@ -38,11 +38,12 @@ class EventQueueBackingStoreFactory { static EventQueueBackingStore get(File checkpointFile, int capacity, String name, boolean upgrade) throws Exception { - return get(checkpointFile, null, capacity, name, upgrade, false); + return get(checkpointFile, null, capacity, name, upgrade, false, false); } static EventQueueBackingStore get(File checkpointFile, File backupCheckpointDir, int capacity,String name, - boolean upgrade, boolean shouldBackup) throws Exception { + boolean upgrade, boolean shouldBackup, boolean compressBackup) + throws Exception { File metaDataFile = Serialization.getMetaDataFile(checkpointFile); RandomAccessFile checkpointFileHandle = null; try { @@ -68,19 +69,19 @@ class EventQueueBackingStoreFactory { throw new IOException("Cannot create " + checkpointFile); } return new EventQueueBackingStoreFileV3(checkpointFile, - capacity, name, backupCheckpointDir, shouldBackup); + capacity, name, backupCheckpointDir, shouldBackup, compressBackup); } // v3 due to meta file, version will be checked by backing store if(metaDataExists) { return new EventQueueBackingStoreFileV3(checkpointFile, capacity, - name, backupCheckpointDir, shouldBackup); + name, backupCheckpointDir, shouldBackup, compressBackup); } checkpointFileHandle = new RandomAccessFile(checkpointFile, "r"); int version = (int)checkpointFileHandle.readLong(); if(Serialization.VERSION_2 == version) { if(upgrade) { return upgrade(checkpointFile, capacity, name, backupCheckpointDir, - shouldBackup); + shouldBackup, compressBackup); } return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name); } @@ -101,7 +102,7 @@ class EventQueueBackingStoreFactory { private static EventQueueBackingStore upgrade(File checkpointFile, int capacity, String name, File backupCheckpointDir, - boolean shouldBackup) + boolean shouldBackup, boolean compressBackup) throws Exception { LOG.info("Attempting upgrade of " + checkpointFile + " for " + name); EventQueueBackingStoreFileV2 backingStoreV2 = @@ -114,7 +115,7 @@ class EventQueueBackingStoreFactory { EventQueueBackingStoreFileV3.upgrade(backingStoreV2, checkpointFile, metaDataFile); return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name, - backupCheckpointDir, shouldBackup); + backupCheckpointDir, shouldBackup, compressBackup); } } http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java index 113dcd2..2b0987b 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java @@ -56,6 +56,8 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { protected static final int CHECKPOINT_COMPLETE = 0; protected static final int CHECKPOINT_INCOMPLETE = 1; + protected static final String COMPRESSED_FILE_EXTENSION = ".snappy"; + protected LongBuffer elementsBuffer; protected final Map<Integer, Long> overwriteMap = new HashMap<Integer, Long>(); protected final Map<Integer, AtomicInteger> logFileIDReferenceCounts = Maps.newHashMap(); @@ -64,22 +66,24 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { protected final File checkpointFile; private final Semaphore backupCompletedSema = new Semaphore(1); protected final boolean shouldBackup; + protected final boolean compressBackup; private final File backupDir; private final ExecutorService checkpointBackUpExecutor; protected EventQueueBackingStoreFile(int capacity, String name, File checkpointFile) throws IOException, BadCheckpointException { - this(capacity, name, checkpointFile, null, false); + this(capacity, name, checkpointFile, null, false, false); } protected EventQueueBackingStoreFile(int capacity, String name, File checkpointFile, File checkpointBackupDir, - boolean backupCheckpoint) throws IOException, - BadCheckpointException { + boolean backupCheckpoint, boolean compressBackup) + throws IOException, BadCheckpointException { super(capacity, name); this.checkpointFile = checkpointFile; this.shouldBackup = backupCheckpoint; + this.compressBackup = compressBackup; this.backupDir = checkpointBackupDir; checkpointFileHandle = new RandomAccessFile(checkpointFile, "rw"); long totalBytes = (capacity + HEADER_SIZE) * Serialization.SIZE_OF_LONG; @@ -169,8 +173,13 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { if(Log.EXCLUDES.contains(origFile.getName())) { continue; } - Serialization.copyFile(origFile, new File(backupDirectory, - origFile.getName())); + if (compressBackup && origFile.equals(checkpointFile)) { + Serialization.compressFile(origFile, new File(backupDirectory, + origFile.getName() + COMPRESSED_FILE_EXTENSION)); + } else { + Serialization.copyFile(origFile, new File(backupDirectory, + origFile.getName())); + } } Preconditions.checkState(!backupFile.exists(), "The backup file exists " + "while it is not supposed to. Are multiple channels configured to use " + @@ -202,7 +211,14 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { String fileName = backupFile.getName(); if (!fileName.equals(BACKUP_COMPLETE_FILENAME) && !fileName.equals(Log.FILE_LOCK)) { - Serialization.copyFile(backupFile, new File(checkpointDir, fileName)); + if (fileName.endsWith(COMPRESSED_FILE_EXTENSION)){ + Serialization.decompressFile( + backupFile, new File(checkpointDir, + fileName.substring(0, fileName.lastIndexOf(".")))); + } else { + Serialization.copyFile(backupFile, new File(checkpointDir, + fileName)); + } } } return true; http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java index c153558..9dfa0d1 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java @@ -40,13 +40,15 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { EventQueueBackingStoreFileV3(File checkpointFile, int capacity, String name) throws IOException, BadCheckpointException { - this(checkpointFile, capacity, name, null, false); + this(checkpointFile, capacity, name, null, false, false); } EventQueueBackingStoreFileV3(File checkpointFile, int capacity, String name, File checkpointBackupDir, - boolean backupCheckpoint) throws IOException, BadCheckpointException { - super(capacity, name, checkpointFile, checkpointBackupDir, backupCheckpoint); + boolean backupCheckpoint, boolean compressBackup) + throws IOException, BadCheckpointException { + super(capacity, name, checkpointFile, checkpointBackupDir, backupCheckpoint, + compressBackup); Preconditions.checkArgument(capacity > 0, "capacity must be greater than 0 " + capacity); metaDataFile = Serialization.getMetaDataFile(checkpointFile); http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index 0f242d2..413bfbc 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -95,6 +95,7 @@ public class FileChannel extends BasicChannelSemantics { private String encryptionActiveKey; private String encryptionCipherProvider; private boolean useDualCheckpoints; + private boolean compressBackupCheckpoint; private boolean fsyncPerTransaction; private int fsyncInterval; @@ -110,6 +111,11 @@ public class FileChannel extends BasicChannelSemantics { useDualCheckpoints = context.getBoolean( FileChannelConfiguration.USE_DUAL_CHECKPOINTS, FileChannelConfiguration.DEFAULT_USE_DUAL_CHECKPOINTS); + + compressBackupCheckpoint = context.getBoolean( + FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT, + FileChannelConfiguration.DEFAULT_COMPRESS_BACKUP_CHECKPOINT); + String homePath = System.getProperty("user.home").replace('\\', '/'); String strCheckpointDir = @@ -272,6 +278,7 @@ public class FileChannel extends BasicChannelSemantics { builder.setEncryptionKeyAlias(encryptionActiveKey); builder.setEncryptionCipherProvider(encryptionCipherProvider); builder.setUseDualCheckpoints(useDualCheckpoints); + builder.setCompressBackupCheckpoint(compressBackupCheckpoint); builder.setBackupCheckpointDir(backupCheckpointDir); builder.setFsyncPerTransaction(fsyncPerTransaction); builder.setFsyncInterval(fsyncInterval); http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java index 87dc653..f8c0378 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java @@ -87,6 +87,11 @@ public class FileChannelConfiguration { public static final String USE_DUAL_CHECKPOINTS = "useDualCheckpoints"; public static final boolean DEFAULT_USE_DUAL_CHECKPOINTS = false; + public static final String COMPRESS_BACKUP_CHECKPOINT = + "compressBackupCheckpoint"; + public static final boolean DEFAULT_COMPRESS_BACKUP_CHECKPOINT + = false; + public static final String FSYNC_PER_TXN = "fsyncPerTransaction"; public static final boolean DEFAULT_FSYNC_PRE_TXN = true; http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 5bac0f4..5b581e1 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -123,6 +123,7 @@ public class Log { private boolean didFastReplay = false; private boolean didFullReplayDueToBadCheckpointException = false; private final boolean useDualCheckpoints; + private final boolean compressBackupCheckpoint; private volatile boolean backupRestored = false; private final boolean fsyncPerTransaction; @@ -151,6 +152,7 @@ public class Log { private String bEncryptionCipherProvider; private long bUsableSpaceRefreshInterval = 15L * 1000L; private boolean bUseDualCheckpoints = false; + private boolean bCompressBackupCheckpoint = false; private File bBackupCheckpointDir = null; private boolean fsyncPerTransaction = true; @@ -242,6 +244,11 @@ public class Log { return this; } + Builder setCompressBackupCheckpoint(boolean compressBackupCheckpoint) { + this.bCompressBackupCheckpoint = compressBackupCheckpoint; + return this; + } + Builder setBackupCheckpointDir(File backupCheckpointDir) { this.bBackupCheckpointDir = backupCheckpointDir; return this; @@ -249,16 +256,17 @@ public class Log { Log build() throws IOException { return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity, - bUseDualCheckpoints, bCheckpointDir, bBackupCheckpointDir, bName, - useLogReplayV1, useFastReplay, bMinimumRequiredSpace, - bEncryptionKeyProvider, bEncryptionKeyAlias, + bUseDualCheckpoints, bCompressBackupCheckpoint,bCheckpointDir, + bBackupCheckpointDir, bName, useLogReplayV1, useFastReplay, + bMinimumRequiredSpace, bEncryptionKeyProvider, bEncryptionKeyAlias, bEncryptionCipherProvider, bUsableSpaceRefreshInterval, fsyncPerTransaction, fsyncInterval, bLogDirs); } } private Log(long checkpointInterval, long maxFileSize, int queueCapacity, - boolean useDualCheckpoints, File checkpointDir, File backupCheckpointDir, + boolean useDualCheckpoints, boolean compressBackupCheckpoint, + File checkpointDir, File backupCheckpointDir, String name, boolean useLogReplayV1, boolean useFastReplay, long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider, @Nullable String encryptionKeyAlias, @@ -338,6 +346,7 @@ public class Log { this.maxFileSize = maxFileSize; this.queueCapacity = queueCapacity; this.useDualCheckpoints = useDualCheckpoints; + this.compressBackupCheckpoint = compressBackupCheckpoint; this.checkpointDir = checkpointDir; this.backupCheckpointDir = backupCheckpointDir; this.logDirs = logDirs; @@ -415,9 +424,10 @@ public class Log { try { backingStore = - EventQueueBackingStoreFactory.get(checkpointFile, - backupCheckpointDir, queueCapacity, channelNameDescriptor, - true, this.useDualCheckpoints); + EventQueueBackingStoreFactory.get(checkpointFile, + backupCheckpointDir, queueCapacity, channelNameDescriptor, + true, this.useDualCheckpoints, + this.compressBackupCheckpoint); queue = new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile, queueSetDir); LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified()) @@ -451,9 +461,10 @@ public class Log { "directory to recover from a corrupt or incomplete checkpoint"); } } - backingStore = EventQueueBackingStoreFactory.get(checkpointFile, - backupCheckpointDir, - queueCapacity, channelNameDescriptor, true, useDualCheckpoints); + backingStore = EventQueueBackingStoreFactory.get( + checkpointFile, backupCheckpointDir, queueCapacity, + channelNameDescriptor, true, useDualCheckpoints, + compressBackupCheckpoint); queue = new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile, queueSetDir); // If the checkpoint was deleted due to BadCheckpointException, then http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java index d55660d..a6eda75 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java @@ -25,11 +25,14 @@ import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.xerial.snappy.SnappyInputStream; +import org.xerial.snappy.SnappyOutputStream; import javax.annotation.Nullable; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.util.Collections; @@ -52,8 +55,8 @@ public class Serialization { public static final String OLD_METADATA_FILENAME = METADATA_FILENAME + ".old"; - // 64 K buffer to copy files. - private static final int FILE_COPY_BUFFER_SIZE = 64 * 1024; + // 64 K buffer to copy and compress files. + private static final int FILE_BUFFER_SIZE = 64 * 1024; public static final Logger LOG = LoggerFactory.getLogger(Serialization.class); @@ -140,7 +143,7 @@ public class Serialization { try { in = new BufferedInputStream(new FileInputStream(from)); out = new RandomAccessFile(to, "rw"); - byte[] buf = new byte[FILE_COPY_BUFFER_SIZE]; + byte[] buf = new byte[FILE_BUFFER_SIZE]; int total = 0; while(true) { int read = in.read(buf); @@ -184,4 +187,141 @@ public class Serialization { throw new IOException("Copying file: " + from.toString() + " to: " + to .toString() + " may have failed."); } + + /** + * Compress file using Snappy + * @param uncompressed File to compress - this file should exist + * @param compressed Compressed file - this file should not exist + * @return true if compression was successful + */ + public static boolean compressFile(File uncompressed, File compressed) + throws IOException { + Preconditions.checkNotNull(uncompressed, + "Source file is null, compression failed."); + Preconditions.checkNotNull(compressed, + "Destination file is null, compression failed."); + Preconditions.checkState(uncompressed.exists(), "Source file: " + + uncompressed.toString() + " does not exist."); + Preconditions.checkState(!compressed.exists(), + "Compressed file: " + compressed.toString() + " unexpectedly " + + "exists."); + + BufferedInputStream in = null; + FileOutputStream out = null; + SnappyOutputStream snappyOut = null; + try { + in = new BufferedInputStream(new FileInputStream(uncompressed)); + out = new FileOutputStream(compressed); + snappyOut = new SnappyOutputStream(out); + + byte[] buf = new byte[FILE_BUFFER_SIZE]; + while(true) { + int read = in.read(buf); + if (read == -1) { + break; + } + snappyOut.write(buf, 0, read); + } + out.getFD().sync(); + return true; + } catch (Exception ex) { + LOG.error("Error while attempting to compress " + + uncompressed.toString() + " to " + compressed.toString() + + ".", ex); + Throwables.propagate(ex); + } finally { + Throwable th = null; + try { + if (in != null) { + in.close(); + } + } catch (Throwable ex) { + LOG.error("Error while closing input file.", ex); + th = ex; + } + try { + if (snappyOut != null) { + snappyOut.close(); + } + } catch (IOException ex) { + LOG.error("Error while closing output file.", ex); + Throwables.propagate(ex); + } + if (th != null) { + Throwables.propagate(th); + } + } + // Should never reach here. + throw new IOException("Copying file: " + uncompressed.toString() + + " to: " + compressed.toString() + " may have failed."); + } + + /** + * Decompress file using Snappy + * @param compressed File to compress - this file should exist + * @param decompressed Compressed file - this file should not exist + * @return true if decompression was successful + */ + public static boolean decompressFile(File compressed, File decompressed) + throws IOException { + Preconditions.checkNotNull(compressed, + "Source file is null, decompression failed."); + Preconditions.checkNotNull(decompressed, "Destination file is " + + "null, decompression failed."); + Preconditions.checkState(compressed.exists(), "Source file: " + + compressed.toString() + " does not exist."); + Preconditions.checkState(!decompressed.exists(), + "Decompressed file: " + decompressed.toString() + + " unexpectedly exists."); + + BufferedInputStream in = null; + SnappyInputStream snappyIn = null; + FileOutputStream out = null; + try { + in = new BufferedInputStream(new FileInputStream(compressed)); + snappyIn = new SnappyInputStream(in); + out = new FileOutputStream(decompressed); + + byte[] buf = new byte[FILE_BUFFER_SIZE]; + while(true) { + int read = snappyIn.read(buf); + if (read == -1) { + break; + } + out.write(buf, 0, read); + } + out.getFD().sync(); + return true; + } catch (Exception ex) { + LOG.error("Error while attempting to compress " + + compressed.toString() + " to " + decompressed.toString() + + ".", ex); + Throwables.propagate(ex); + } finally { + Throwable th = null; + try { + if (in != null) { + in.close(); + } + } catch (Throwable ex) { + LOG.error("Error while closing input file.", ex); + th = ex; + } + try { + if (snappyIn != null) { + snappyIn.close(); + } + } catch (IOException ex) { + LOG.error("Error while closing output file.", ex); + Throwables.propagate(ex); + } + if (th != null) { + Throwables.propagate(th); + } + } + // Should never reach here. + throw new IOException("Decompressing file: " + + compressed.toString() + " to: " + decompressed.toString() + + " may have failed."); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java index 1ee5320..9901b69 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java @@ -38,12 +38,17 @@ public class TestFileChannelBase { protected File[] dataDirs; protected String dataDir; protected File backupDir; + protected File uncompressedBackupCheckpoint; + protected File compressedBackupCheckpoint; @Before public void setup() throws Exception { baseDir = Files.createTempDir(); checkpointDir = new File(baseDir, "chkpt"); backupDir = new File(baseDir, "backup"); + uncompressedBackupCheckpoint = new File(backupDir, "checkpoint"); + compressedBackupCheckpoint = new File(backupDir, + "checkpoint.snappy"); Assert.assertTrue(checkpointDir.mkdirs() || checkpointDir.isDirectory()); Assert.assertTrue(backupDir.mkdirs() || backupDir.isDirectory()); dataDirs = new File[3]; http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java index d16f3d5..0c6afc4 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java @@ -693,13 +693,27 @@ public class TestFileChannelRestart extends TestFileChannelBase { // Make sure the entire channel was not replayed, only the events from the // backup. @Test - public void testBackupUsedEnsureNoFullReplay() throws Exception { + public void testBackupUsedEnsureNoFullReplayWithoutCompression() throws + Exception { + testBackupUsedEnsureNoFullReplay(false); + } + @Test + public void testBackupUsedEnsureNoFullReplayWithCompression() throws + Exception { + testBackupUsedEnsureNoFullReplay(true); + } + + private void testBackupUsedEnsureNoFullReplay(boolean compressedBackup) + throws Exception { File dataDir = Files.createTempDir(); File tempBackup = Files.createTempDir(); Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.DATA_DIRS, dataDir.getAbsolutePath()); - overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true"); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, + "true"); + overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT, + String.valueOf(compressedBackup)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); @@ -831,6 +845,86 @@ public class TestFileChannelRestart extends TestFileChannelBase { } } + @Test + public void testCompressBackup() throws Throwable { + Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, + "true"); + overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "1000"); + overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT, + "true"); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + putEvents(channel, "restart", 10, 100); + forceCheckpoint(channel); + + //Wait for the backup checkpoint + Thread.sleep(2000); + + Assert.assertTrue(compressedBackupCheckpoint.exists()); + + Serialization.decompressFile(compressedBackupCheckpoint, + uncompressedBackupCheckpoint); + + File checkpoint = new File(checkpointDir, "checkpoint"); + Assert.assertTrue(FileUtils.contentEquals(checkpoint, + uncompressedBackupCheckpoint)); + + channel.stop(); + } + + @Test + public void testToggleCheckpointCompressionFromTrueToFalse() + throws Exception { + restartToggleCompression(true); + } + + @Test + public void testToggleCheckpointCompressionFromFalseToTrue() + throws Exception { + restartToggleCompression(false); + } + + public void restartToggleCompression(boolean originalCheckpointCompressed) + throws Exception { + Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, + "true"); + overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "1000"); + overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT, + String.valueOf(originalCheckpointCompressed)); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> in = fillChannel(channel, "restart"); + forceCheckpoint(channel); + Thread.sleep(2000); + Assert.assertEquals(compressedBackupCheckpoint.exists(), + originalCheckpointCompressed); + Assert.assertEquals(uncompressedBackupCheckpoint.exists(), + !originalCheckpointCompressed); + channel.stop(); + File checkpoint = new File(checkpointDir, "checkpoint"); + Assert.assertTrue(checkpoint.delete()); + File checkpointMetaData = Serialization.getMetaDataFile( + checkpoint); + Assert.assertTrue(checkpointMetaData.delete()); + overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT, + String.valueOf(!originalCheckpointCompressed)); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> out = consumeChannel(channel); + compareInputAndOut(in, out); + forceCheckpoint(channel); + Thread.sleep(2000); + Assert.assertEquals(compressedBackupCheckpoint.exists(), + !originalCheckpointCompressed); + Assert.assertEquals(uncompressedBackupCheckpoint.exists(), + originalCheckpointCompressed); + } + private static void slowdownBackup(FileChannel channel) { Log log = field("log").ofType(Log.class).in(channel).get(); http://git-wip-us.apache.org/repos/asf/flume/blob/69fd6b3a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5d31d4c..541548f 100644 --- a/pom.xml +++ b/pom.xml @@ -1270,6 +1270,12 @@ limitations under the License. <version>${kite.version}</version> </dependency> + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>1.1.0</version> + </dependency> + </dependencies> </dependencyManagement>
