Updated Branches: refs/heads/flume-1.4 542f87e63 -> e9b91ee2a
http://git-wip-us.apache.org/repos/asf/flume/blob/e9b91ee2/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java index 170dc72..fb0e208 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java @@ -18,12 +18,13 @@ */ package org.apache.flume.channel.file; -import static org.apache.flume.channel.file.TestUtils.*; - -import java.io.File; -import java.util.Map; -import java.util.Set; - +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.io.Files; +import org.apache.commons.io.FileUtils; +import org.apache.flume.channel.file.proto.ProtosFactory; +import org.fest.reflect.exception.ReflectionError; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -31,12 +32,23 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; +import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; import java.io.RandomAccessFile; +import java.util.Map; import java.util.Random; -import org.apache.flume.channel.file.proto.ProtosFactory; +import java.util.Set; + +import static org.apache.flume.channel.file.TestUtils.compareInputAndOut; +import static org.apache.flume.channel.file.TestUtils.consumeChannel; +import static org.apache.flume.channel.file.TestUtils.fillChannel; +import static org.apache.flume.channel.file.TestUtils.forceCheckpoint; +import static org.apache.flume.channel.file.TestUtils.putEvents; +import static org.apache.flume.channel.file.TestUtils.takeEvents; +import static org.fest.reflect.core.Reflection.*; public class TestFileChannelRestart extends TestFileChannelBase { protected static final Logger LOG = LoggerFactory @@ -119,16 +131,32 @@ public class TestFileChannelRestart extends TestFileChannelBase { Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } + + @Test + public void testRestartWhenMetaDataExistsButCheckpointDoesNot() throws + Exception { + doTestRestartWhenMetaDataExistsButCheckpointDoesNot(false); + } + @Test - public void testRestartWhenMetaDataExistsButCheckpointDoesNot() + public void testRestartWhenMetaDataExistsButCheckpointDoesNotWithBackup() throws Exception { + doTestRestartWhenMetaDataExistsButCheckpointDoesNot(true); + } + + private void doTestRestartWhenMetaDataExistsButCheckpointDoesNot( + boolean backup) throws Exception { Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); + if(backup) { + Thread.sleep(2000); + } channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); Assert.assertTrue(checkpoint.delete()); @@ -139,19 +167,36 @@ public class TestFileChannelRestart extends TestFileChannelBase { Assert.assertTrue(channel.isOpen()); Assert.assertTrue(checkpoint.exists()); Assert.assertTrue(checkpointMetaData.exists()); + Assert.assertTrue(!backup || channel.checkpointBackupRestored()); Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } + + @Test + public void testRestartWhenCheckpointExistsButMetaDoesNot() throws Exception{ + doTestRestartWhenCheckpointExistsButMetaDoesNot(false); + } + @Test - public void testRestartWhenCheckpointExistsButMetaDoesNot() + public void testRestartWhenCheckpointExistsButMetaDoesNotWithBackup() throws + Exception{ + doTestRestartWhenCheckpointExistsButMetaDoesNot(true); + } + + + private void doTestRestartWhenCheckpointExistsButMetaDoesNot(boolean backup) throws Exception { Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); + if(backup) { + Thread.sleep(2000); + } channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); File checkpointMetaData = Serialization.getMetaDataFile(checkpoint); @@ -162,19 +207,34 @@ public class TestFileChannelRestart extends TestFileChannelBase { Assert.assertTrue(channel.isOpen()); Assert.assertTrue(checkpoint.exists()); Assert.assertTrue(checkpointMetaData.exists()); + Assert.assertTrue(!backup || channel.checkpointBackupRestored()); Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } @Test public void testRestartWhenNoCheckpointExists() throws Exception { + doTestRestartWhenNoCheckpointExists(false); + } + + @Test + public void testRestartWhenNoCheckpointExistsWithBackup() throws Exception { + doTestRestartWhenNoCheckpointExists(true); + } + + private void doTestRestartWhenNoCheckpointExists(boolean backup) throws + Exception { Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); + if(backup) { + Thread.sleep(2000); + } channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); File checkpointMetaData = Serialization.getMetaDataFile(checkpoint); @@ -185,19 +245,33 @@ public class TestFileChannelRestart extends TestFileChannelBase { Assert.assertTrue(channel.isOpen()); Assert.assertTrue(checkpoint.exists()); Assert.assertTrue(checkpointMetaData.exists()); + Assert.assertTrue(!backup || channel.checkpointBackupRestored()); Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } @Test - public void testBadCheckpointVersion() throws Exception{ + public void testBadCheckpointVersion() throws Exception { + doTestBadCheckpointVersion(false); + } + + @Test + public void testBadCheckpointVersionWithBackup() throws Exception { + doTestBadCheckpointVersion(true); + } + + private void doTestBadCheckpointVersion(boolean backup) throws Exception{ Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); + if(backup) { + Thread.sleep(2000); + } channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); @@ -209,19 +283,34 @@ public class TestFileChannelRestart extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); + Assert.assertTrue(!backup || channel.checkpointBackupRestored()); Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } @Test public void testBadCheckpointMetaVersion() throws Exception { + doTestBadCheckpointMetaVersion(false); + } + + @Test + public void testBadCheckpointMetaVersionWithBackup() throws Exception { + doTestBadCheckpointMetaVersion(true); + } + + private void doTestBadCheckpointMetaVersion(boolean backup) throws + Exception { Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); + if(backup) { + Thread.sleep(2000); + } channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); FileInputStream is = new FileInputStream(Serialization.getMetaDataFile(checkpoint)); @@ -235,19 +324,35 @@ public class TestFileChannelRestart extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); + Assert.assertTrue(!backup || channel.checkpointBackupRestored()); Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } @Test public void testDifferingOrderIDCheckpointAndMetaVersion() throws Exception { + doTestDifferingOrderIDCheckpointAndMetaVersion(false); + } + + @Test + public void testDifferingOrderIDCheckpointAndMetaVersionWithBackup() throws + Exception { + doTestDifferingOrderIDCheckpointAndMetaVersion(true); + } + + private void doTestDifferingOrderIDCheckpointAndMetaVersion(boolean backup) + throws Exception { Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); + if(backup) { + Thread.sleep(2000); + } channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); FileInputStream is = new FileInputStream(Serialization.getMetaDataFile(checkpoint)); @@ -261,19 +366,33 @@ public class TestFileChannelRestart extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); + Assert.assertTrue(!backup || channel.checkpointBackupRestored()); Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } @Test - public void testIncompleteCheckpoint() throws Exception { + public void testIncompleteCheckpoint() throws Exception{ + doTestIncompleteCheckpoint(false); + } + + @Test + public void testIncompleteCheckpointWithCheckpoint() throws Exception{ + doTestIncompleteCheckpoint(true); + } + + private void doTestIncompleteCheckpoint(boolean backup) throws Exception { Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); + if(backup) { + Thread.sleep(2000); + } channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); @@ -285,18 +404,29 @@ public class TestFileChannelRestart extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); + Assert.assertTrue(!backup || channel.checkpointBackupRestored()); Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } @Test public void testCorruptInflightPuts() throws Exception { - testCorruptInflights("inflightPuts"); + doTestCorruptInflights("inflightPuts", false); + } + + @Test + public void testCorruptInflightPutsWithBackup() throws Exception { + doTestCorruptInflights("inflightPuts", true); } @Test public void testCorruptInflightTakes() throws Exception { - testCorruptInflights("inflightTakes"); + doTestCorruptInflights("inflightTakes", false); + } + + @Test + public void testCorruptInflightTakesWithBackup() throws Exception { + doTestCorruptInflights("inflightTakes", true); } @Test @@ -352,14 +482,19 @@ public class TestFileChannelRestart extends TestFileChannelBase { compareInputAndOut(in, out); } - private void testCorruptInflights(String name) throws Exception { + private void doTestCorruptInflights(String name, + boolean backup) throws Exception { Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); + if(backup) { + Thread.sleep(2000); + } channel.stop(); File inflight = new File(checkpointDir, name); RandomAccessFile writer = new RandomAccessFile(inflight, "rw"); @@ -368,19 +503,33 @@ public class TestFileChannelRestart extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); + Assert.assertTrue(!backup || channel.checkpointBackupRestored()); Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } @Test public void testTruncatedCheckpointMeta() throws Exception { + doTestTruncatedCheckpointMeta(false); + } + + @Test + public void testTruncatedCheckpointMetaWithBackup() throws Exception { + doTestTruncatedCheckpointMeta(true); + } + + private void doTestTruncatedCheckpointMeta(boolean backup) throws Exception { Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); + if(backup) { + Thread.sleep(2000); + } channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); RandomAccessFile writer = new RandomAccessFile( @@ -391,19 +540,33 @@ public class TestFileChannelRestart extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); + Assert.assertTrue(!backup || channel.checkpointBackupRestored()); Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } @Test public void testCorruptCheckpointMeta() throws Exception { + doTestCorruptCheckpointMeta(false); + } + + @Test + public void testCorruptCheckpointMetaWithBackup() throws Exception { + doTestCorruptCheckpointMeta(true); + } + + private void doTestCorruptCheckpointMeta(boolean backup) throws Exception { Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); + if(backup) { + Thread.sleep(2000); + } channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); RandomAccessFile writer = new RandomAccessFile( @@ -415,10 +578,19 @@ public class TestFileChannelRestart extends TestFileChannelBase { channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); + Assert.assertTrue(!backup || channel.checkpointBackupRestored()); Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } + private void checkIfBackupUsed(boolean backup) { + boolean backupRestored = channel.checkpointBackupRestored(); + if (backup) { + Assert.assertTrue(backupRestored); + } else { + Assert.assertFalse(backupRestored); + } + } @Test public void testWithExtraLogs() @@ -445,4 +617,158 @@ public class TestFileChannelRestart extends TestFileChannelBase { Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } + + // Make sure the entire channel was not replayed, only the events from the + // backup. + @Test + public void testBackupUsedEnsureNoFullReplay() throws Exception { + File dataDir = Files.createTempDir(); + File tempBackup = Files.createTempDir(); + Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.DATA_DIRS, + dataDir.getAbsolutePath()); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true"); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); + forceCheckpoint(channel); + Thread.sleep(2000); + in = putEvents(channel, "restart", 10, 100); + takeEvents(channel, 10, 100); + Assert.assertEquals(100, in.size()); + for(File file : backupDir.listFiles()) { + if(file.getName().equals(Log.FILE_LOCK)) { + continue; + } + Files.copy(file, new File(tempBackup, file.getName())); + } + forceCheckpoint(channel); + channel.stop(); + + Serialization.deleteAllFiles(checkpointDir, Log.EXCLUDES); + // The last checkpoint may have been already backed up (it did while I + // was running this test, since the checkpoint itself is tiny in unit + // tests), so throw away the backup and force the use of an older backup by + // bringing in the copy of the last backup before the checkpoint. + Serialization.deleteAllFiles(backupDir, Log.EXCLUDES); + for(File file : tempBackup.listFiles()) { + if(file.getName().equals(Log.FILE_LOCK)) { + continue; + } + Files.copy(file, new File(backupDir, file.getName())); + } + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + checkIfBackupUsed(true); + Assert.assertEquals(100, channel.getLog().getPutCount()); + Assert.assertEquals(20, channel.getLog().getCommittedCount()); + Assert.assertEquals(100, channel.getLog().getTakeCount()); + Assert.assertEquals(0, channel.getLog().getRollbackCount()); + //Read Count = 100 puts + 10 commits + 100 takes + 10 commits + Assert.assertEquals(220, channel.getLog().getReadCount()); + consumeChannel(channel); + FileUtils.deleteQuietly(dataDir); + FileUtils.deleteQuietly(tempBackup); + } + + //Make sure data files required by the backup checkpoint are not deleted. + @Test + public void testDataFilesRequiredByBackupNotDeleted() throws Exception { + Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true"); + overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "1000"); + channel = createFileChannel(overrides); + channel.start(); + String prefix = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz"; + Assert.assertTrue(channel.isOpen()); + putEvents(channel, prefix, 10, 100); + Set<String> origFiles = Sets.newHashSet(); + for(File dir : dataDirs) { + origFiles.addAll(Lists.newArrayList(dir.list())); + } + forceCheckpoint(channel); + takeEvents(channel, 10, 50); + long beforeSecondCheckpoint = System.currentTimeMillis(); + forceCheckpoint(channel); + Set<String> newFiles = Sets.newHashSet(); + int olderThanCheckpoint = 0; + int totalMetaFiles = 0; + for(File dir : dataDirs) { + File[] metadataFiles = dir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + if (name.endsWith(".meta")) { + return true; + } + return false; + } + }); + totalMetaFiles = metadataFiles.length; + for(File metadataFile : metadataFiles) { + if(metadataFile.lastModified() < beforeSecondCheckpoint) { + olderThanCheckpoint++; + } + } + newFiles.addAll(Lists.newArrayList(dir.list())); + } + /* + * Files which are not required by the new checkpoint should not have been + * modified by the checkpoint. + */ + Assert.assertTrue(olderThanCheckpoint > 0); + Assert.assertTrue(totalMetaFiles != olderThanCheckpoint); + + /* + * All files needed by original checkpoint should still be there. + */ + Assert.assertTrue(newFiles.containsAll(origFiles)); + takeEvents(channel, 10, 50); + forceCheckpoint(channel); + newFiles = Sets.newHashSet(); + for(File dir : dataDirs) { + newFiles.addAll(Lists.newArrayList(dir.list())); + } + Assert.assertTrue(!newFiles.containsAll(origFiles)); + } + + @Test (expected = IOException.class) + public void testSlowBackup() throws Throwable { + Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true"); + overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "1000"); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); + slowdownBackup(channel); + forceCheckpoint(channel); + in = putEvents(channel, "restart", 10, 100); + takeEvents(channel, 10, 100); + Assert.assertEquals(100, in.size()); + try { + forceCheckpoint(channel); + } catch (ReflectionError ex) { + throw ex.getCause(); + } finally { + channel.stop(); + } + } + + private static void slowdownBackup(FileChannel channel) { + Log log = field("log").ofType(Log.class).in(channel).get(); + + FlumeEventQueue queue = field("queue") + .ofType(FlumeEventQueue.class) + .in(log).get(); + + EventQueueBackingStore backingStore = field("backingStore") + .ofType(EventQueueBackingStore.class) + .in(queue).get(); + + field("slowdownBackup").ofType(Boolean.class).in(backingStore).set(true); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/e9b91ee2/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java index ba653e6..7c490b5 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java @@ -237,7 +237,7 @@ public class TestUtils { } catch (Exception ex) { transaction.rollback(); if(untilCapacityIsReached && ex instanceof ChannelException && - ("The channel has reached it's capacity. " + ("The channel has reached it's capacity. " + "This might be the result of a sink on the channel having too " + "low of batch size, a downstream system running slower than " + "normal, or that the channel capacity is just too low. " @@ -260,10 +260,13 @@ public class TestUtils { } public static Context createFileChannelContext(String checkpointDir, - String dataDir, Map<String, String> overrides) { + String dataDir, String backupDir, Map<String, String> overrides) { Context context = new Context(); context.put(FileChannelConfiguration.CHECKPOINT_DIR, checkpointDir); + if(backupDir != null) { + context.put(FileChannelConfiguration.BACKUP_CHECKPOINT_DIR, backupDir); + } context.put(FileChannelConfiguration.DATA_DIRS, dataDir); context.put(FileChannelConfiguration.KEEP_ALIVE, String.valueOf(1)); context.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000)); @@ -273,10 +276,16 @@ public class TestUtils { return context; } public static FileChannel createFileChannel(String checkpointDir, - String dataDir, Map<String, String> overrides) { + String dataDir, Map<String, String> overrides) { + return createFileChannel(checkpointDir, dataDir, null, overrides); + } + + public static FileChannel createFileChannel(String checkpointDir, + String dataDir, String backupDir, Map<String, String> overrides) { FileChannel channel = new FileChannel(); channel.setName("FileChannel-" + UUID.randomUUID()); - Context context = createFileChannelContext(checkpointDir, dataDir, overrides); + Context context = createFileChannelContext(checkpointDir, dataDir, + backupDir, overrides); Configurables.configure(channel, context); return channel; } http://git-wip-us.apache.org/repos/asf/flume/blob/e9b91ee2/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 16fba45..693c0d7 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1930,6 +1930,8 @@ Property Name Default Description ================================================ ================================ ======================================================== **type** -- The component type name, needs to be ``file``. checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored +useDualCheckpoints false Backup the checkpoint. If this is set to ``true``, ``backupCheckpointDir`` **must** be set +backupCheckpointDir -- The directory where the checkpoint is backed up to. This directory **must not** be the same as the data directories or the checkpoint directory dataDirs ~/.flume/file-channel/data The directory where log files will be stored transactionCapacity 1000 The maximum size of transaction supported by the channel checkpointInterval 30000 Amount of time (in millis) between checkpoints
