Repository: flume Updated Branches: refs/heads/trunk 66327aa20 -> 4d79aa003
FLUME-3152 Add Flume Metric for Backup Checkpoint Errors This change adds a new metric (channel.file.checkpoint.backup.write.error) to the File Channel. It gets incremented if an exception happens during backup checkpoints writes. This closes #156 Reviewers: Denes Arvay (Ferenc Szabo via Denes Arvay) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4d79aa00 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4d79aa00 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4d79aa00 Branch: refs/heads/trunk Commit: 4d79aa003aa02e8d513a1ae1406795d758143397 Parents: 66327aa Author: Ferenc Szabo <[email protected]> Authored: Mon Aug 21 14:29:38 2017 +0200 Committer: Denes Arvay <[email protected]> Committed: Wed Aug 23 18:18:39 2017 +0200 ---------------------------------------------------------------------- .../flume/channel/file/CheckpointRebuilder.java | 3 +- .../file/EventQueueBackingStoreFactory.java | 41 +++++---- .../file/EventQueueBackingStoreFile.java | 24 ++++-- .../file/EventQueueBackingStoreFileV2.java | 8 +- .../file/EventQueueBackingStoreFileV3.java | 12 +-- .../java/org/apache/flume/channel/file/Log.java | 4 +- .../instrumentation/FileChannelCounter.java | 16 +++- .../FileChannelCounterMBean.java | 7 ++ .../flume/channel/file/TestCheckpoint.java | 3 +- .../channel/file/TestCheckpointRebuilder.java | 3 +- .../file/TestEventQueueBackingStoreFactory.java | 87 +++++++++++++------- .../file/TestFileChannelErrorMetrics.java | 67 +++++++++++++++ .../flume/channel/file/TestFlumeEventQueue.java | 19 +++-- 13 files changed, 220 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java index a0ecdeb..8fbf3c8 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java @@ -28,6 +28,7 @@ import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.flume.channel.file.instrumentation.FileChannelCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,7 +241,7 @@ public class CheckpointRebuilder { } else { EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.get(checkpointFile, - capacity, "channel"); + capacity, "channel", new FileChannelCounter("Main")); FlumeEventQueue queue = new FlumeEventQueue(backingStore, new File(checkpointDir, "inflighttakes"), new File(checkpointDir, "inflightputs"), http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java index dcd6f98..7f8b3f6 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java @@ -19,6 +19,7 @@ package org.apache.flume.channel.file; import com.google.common.io.Files; +import org.apache.flume.channel.file.instrumentation.FileChannelCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,19 +32,22 @@ class EventQueueBackingStoreFactory { private EventQueueBackingStoreFactory() {} - static EventQueueBackingStore get(File checkpointFile, int capacity, - String name) throws Exception { - return get(checkpointFile, capacity, name, true); + static EventQueueBackingStore get( + File checkpointFile, int capacity, String name, FileChannelCounter counter + ) throws Exception { + return get(checkpointFile, capacity, name, counter, true); } - static EventQueueBackingStore get(File checkpointFile, int capacity, - String name, boolean upgrade) throws Exception { - return get(checkpointFile, null, capacity, name, upgrade, false, false); + static EventQueueBackingStore get( + File checkpointFile, int capacity, String name, FileChannelCounter counter, boolean upgrade + ) throws Exception { + return get(checkpointFile, null, capacity, name, counter, upgrade, false, false); } - static EventQueueBackingStore get(File checkpointFile, File backupCheckpointDir, - int capacity, String name, boolean upgrade, - boolean shouldBackup, boolean compressBackup) throws Exception { + static EventQueueBackingStore get( + File checkpointFile, File backupCheckpointDir, int capacity, String name, + FileChannelCounter counter, boolean upgrade, boolean shouldBackup, boolean compressBackup + ) throws Exception { File metaDataFile = Serialization.getMetaDataFile(checkpointFile); RandomAccessFile checkpointFileHandle = null; try { @@ -69,21 +73,21 @@ class EventQueueBackingStoreFactory { throw new IOException("Cannot create " + checkpointFile); } return new EventQueueBackingStoreFileV3(checkpointFile, - capacity, name, backupCheckpointDir, shouldBackup, compressBackup); + capacity, name, counter, backupCheckpointDir, shouldBackup, compressBackup); } // v3 due to meta file, version will be checked by backing store if (metaDataExists) { return new EventQueueBackingStoreFileV3(checkpointFile, capacity, - name, backupCheckpointDir, shouldBackup, compressBackup); + name, counter, backupCheckpointDir, shouldBackup, compressBackup); } checkpointFileHandle = new RandomAccessFile(checkpointFile, "r"); int version = (int) checkpointFileHandle.readLong(); if (Serialization.VERSION_2 == version) { if (upgrade) { return upgrade(checkpointFile, capacity, name, backupCheckpointDir, - shouldBackup, compressBackup); + shouldBackup, compressBackup, counter); } - return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name); + return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name, counter); } LOG.error("Found version " + Integer.toHexString(version) + " in " + checkpointFile); @@ -100,12 +104,13 @@ class EventQueueBackingStoreFactory { } } - private static EventQueueBackingStore upgrade(File checkpointFile, int capacity, String name, - File backupCheckpointDir, boolean shouldBackup, - boolean compressBackup) throws Exception { + private static EventQueueBackingStore upgrade( + File checkpointFile, int capacity, String name, File backupCheckpointDir, + boolean shouldBackup, boolean compressBackup, FileChannelCounter counter + ) throws Exception { LOG.info("Attempting upgrade of " + checkpointFile + " for " + name); EventQueueBackingStoreFileV2 backingStoreV2 = - new EventQueueBackingStoreFileV2(checkpointFile, capacity, name); + new EventQueueBackingStoreFileV2(checkpointFile, capacity, name, counter); String backupName = checkpointFile.getName() + "-backup-" + System.currentTimeMillis(); Files.copy(checkpointFile, @@ -113,7 +118,7 @@ class EventQueueBackingStoreFactory { File metaDataFile = Serialization.getMetaDataFile(checkpointFile); EventQueueBackingStoreFileV3.upgrade(backingStoreV2, checkpointFile, metaDataFile); - return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name, + return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name, counter, backupCheckpointDir, shouldBackup, compressBackup); } http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java index 73f1d4c..445d912 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Maps; import com.google.common.collect.SetMultimap; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.flume.channel.file.instrumentation.FileChannelCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +61,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { protected final Map<Integer, AtomicInteger> logFileIDReferenceCounts = Maps.newHashMap(); protected final MappedByteBuffer mappedBuffer; protected final RandomAccessFile checkpointFileHandle; + private final FileChannelCounter fileChannelCounter; protected final File checkpointFile; private final Semaphore backupCompletedSema = new Semaphore(1); protected final boolean shouldBackup; @@ -67,17 +69,18 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { private final File backupDir; private final ExecutorService checkpointBackUpExecutor; - protected EventQueueBackingStoreFile(int capacity, String name, - File checkpointFile) throws IOException, - BadCheckpointException { - this(capacity, name, checkpointFile, null, false, false); + protected EventQueueBackingStoreFile( + int capacity, String name, FileChannelCounter fileChannelCounter, File checkpointFile + ) throws IOException, BadCheckpointException { + this(capacity, name, fileChannelCounter, checkpointFile, null, false, false); } - protected EventQueueBackingStoreFile(int capacity, String name, - File checkpointFile, File checkpointBackupDir, - boolean backupCheckpoint, boolean compressBackup) - throws IOException, BadCheckpointException { + protected EventQueueBackingStoreFile( + int capacity, String name, FileChannelCounter fileChannelCounter, File checkpointFile, + File checkpointBackupDir, boolean backupCheckpoint, boolean compressBackup + ) throws IOException, BadCheckpointException { super(capacity, name); + this.fileChannelCounter = fileChannelCounter; this.checkpointFile = checkpointFile; this.shouldBackup = backupCheckpoint; this.compressBackup = compressBackup; @@ -294,6 +297,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { try { backupCheckpoint(backupDir); } catch (Throwable throwable) { + fileChannelCounter.incrementCheckpointBackupWriteErrorCount(); error = true; LOG.error("Backing up of checkpoint directory failed.", throwable); } finally { @@ -432,7 +436,9 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { } int capacity = (int) ((file.length() - (HEADER_SIZE * 8L)) / 8L); EventQueueBackingStoreFile backingStore = (EventQueueBackingStoreFile) - EventQueueBackingStoreFactory.get(file, capacity, "debug", false); + EventQueueBackingStoreFactory.get( + file, capacity, "debug", new FileChannelCounter("Main"), false + ); System.out.println("File Reference Counts" + backingStore.logFileIDReferenceCounts); System.out.println("Queue Capacity " + backingStore.getCapacity()); http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java index 71183aa..3711a78 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Preconditions; +import org.apache.flume.channel.file.instrumentation.FileChannelCounter; final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile { @@ -33,9 +34,10 @@ final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile { private static final int INDEX_ACTIVE_LOG = 5; private static final int MAX_ACTIVE_LOGS = 1024; - EventQueueBackingStoreFileV2(File checkpointFile, int capacity, String name) - throws IOException, BadCheckpointException { - super(capacity, name, checkpointFile); + EventQueueBackingStoreFileV2( + File checkpointFile, int capacity, String name, FileChannelCounter counter + ) throws IOException, BadCheckpointException { + super(capacity, name, counter, checkpointFile); Preconditions.checkArgument(capacity > 0, "capacity must be greater than 0 " + capacity); http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java index f1a892a..da5a082 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java @@ -20,6 +20,7 @@ package org.apache.flume.channel.file; import com.google.common.base.Preconditions; import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.flume.channel.file.instrumentation.FileChannelCounter; import org.apache.flume.channel.file.proto.ProtosFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,16 +37,17 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { private static final Logger LOG = LoggerFactory.getLogger(EventQueueBackingStoreFileV3.class); private final File metaDataFile; - EventQueueBackingStoreFileV3(File checkpointFile, int capacity, - String name) throws IOException, BadCheckpointException { - this(checkpointFile, capacity, name, null, false, false); + EventQueueBackingStoreFileV3( + File checkpointFile, int capacity, String name, FileChannelCounter counter + ) throws IOException, BadCheckpointException { + this(checkpointFile, capacity, name, counter, null, false, false); } EventQueueBackingStoreFileV3(File checkpointFile, int capacity, - String name, File checkpointBackupDir, + String name, FileChannelCounter counter, File checkpointBackupDir, boolean backupCheckpoint, boolean compressBackup) throws IOException, BadCheckpointException { - super(capacity, name, checkpointFile, checkpointBackupDir, backupCheckpoint, + super(capacity, name, counter, checkpointFile, checkpointBackupDir, backupCheckpoint, compressBackup); Preconditions.checkArgument(capacity > 0, "capacity must be greater than 0 " + capacity); http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 1662a5b..efc8d14 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -450,7 +450,7 @@ public class Log { backingStore = EventQueueBackingStoreFactory.get(checkpointFile, backupCheckpointDir, queueCapacity, channelNameDescriptor, - true, this.useDualCheckpoints, + channelCounter, true, this.useDualCheckpoints, this.compressBackupCheckpoint); queue = new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile, queueSetDir); @@ -487,7 +487,7 @@ public class Log { } backingStore = EventQueueBackingStoreFactory.get( checkpointFile, backupCheckpointDir, queueCapacity, - channelNameDescriptor, true, useDualCheckpoints, + channelNameDescriptor, channelCounter, true, useDualCheckpoints, compressBackupCheckpoint); queue = new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile, queueSetDir); http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java index 40470a8..6cec3da 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java @@ -28,10 +28,15 @@ public class FileChannelCounter extends ChannelCounter implements FileChannelCou private static final String EVENT_PUT_ERROR_COUNT = "channel.file.event.put.error"; private static final String EVENT_TAKE_ERROR_COUNT = "channel.file.event.take.error"; private static final String CHECKPOINT_WRITE_ERROR_COUNT = "channel.file.checkpoint.write.error"; + private static final String CHECKPOINT_BACKUP_WRITE_ERROR_COUNT + = "channel.file.checkpoint.backup.write.error"; public FileChannelCounter(String name) { super(name, new String[] { - EVENT_PUT_ERROR_COUNT, EVENT_TAKE_ERROR_COUNT, CHECKPOINT_WRITE_ERROR_COUNT }); + EVENT_PUT_ERROR_COUNT, EVENT_TAKE_ERROR_COUNT, + CHECKPOINT_WRITE_ERROR_COUNT, CHECKPOINT_BACKUP_WRITE_ERROR_COUNT + } + ); } @Override @@ -83,4 +88,13 @@ public class FileChannelCounter extends ChannelCounter implements FileChannelCou public void incrementCheckpointWriteErrorCount() { increment(CHECKPOINT_WRITE_ERROR_COUNT); } + + @Override + public long getCheckpointBackupWriteErrorCount() { + return get(CHECKPOINT_BACKUP_WRITE_ERROR_COUNT); + } + + public void incrementCheckpointBackupWriteErrorCount() { + increment(CHECKPOINT_BACKUP_WRITE_ERROR_COUNT); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java index 175b1f4..9386094 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java @@ -62,4 +62,11 @@ public interface FileChannelCounterMBean extends ChannelCounterMBean { * @see org.apache.flume.channel.file.Log.BackgroundWorker#run() */ long getCheckpointWriteErrorCount(); + + /** + * A count of the number of errors encountered while trying to write the backup checkpoints. This + * includes any Throwables. + * @see org.apache.flume.channel.file.EventQueueBackingStoreFile#startBackupThread() + */ + long getCheckpointBackupWriteErrorCount(); } http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java index cd1dcd9..1e00ee2 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java @@ -23,6 +23,7 @@ import java.io.IOException; import junit.framework.Assert; +import org.apache.flume.channel.file.instrumentation.FileChannelCounter; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -51,7 +52,7 @@ public class TestCheckpoint { @Test public void testSerialization() throws Exception { EventQueueBackingStore backingStore = - new EventQueueBackingStoreFileV2(file, 1, "test"); + new EventQueueBackingStoreFileV2(file, 1, "test", new FileChannelCounter("test")); FlumeEventPointer ptrIn = new FlumeEventPointer(10, 20); FlumeEventQueue queueIn = new FlumeEventQueue(backingStore, inflightTakes, inflightPuts, queueSet); http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java index c6c6ad3..6c91661 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java @@ -24,6 +24,7 @@ import java.io.File; import java.util.Map; import java.util.Set; +import org.apache.flume.channel.file.instrumentation.FileChannelCounter; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -70,7 +71,7 @@ public class TestCheckpointRebuilder extends TestFileChannelBase { Assert.assertTrue(inflightPutsFile.delete()); EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.get(checkpointFile, 50, - "test"); + "test", new FileChannelCounter("test")); FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile, queueSetDir); CheckpointRebuilder checkpointRebuilder = http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java index 0939454..7aebb03 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java @@ -23,6 +23,7 @@ import com.google.common.io.Files; import com.google.protobuf.InvalidProtocolBufferException; import junit.framework.Assert; import org.apache.commons.io.FileUtils; +import org.apache.flume.channel.file.instrumentation.FileChannelCounter; import org.apache.flume.channel.file.proto.ProtosFactory; import org.junit.After; import org.junit.Before; @@ -75,29 +76,39 @@ public class TestEventQueueBackingStoreFactory { @Test public void testWithNoFlag() throws Exception { - verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test"), - Serialization.VERSION_3, pointersInTestCheckpoint); + verify( + EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test")), + Serialization.VERSION_3, pointersInTestCheckpoint + ); } @Test public void testWithFlag() throws Exception { - verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", true), - Serialization.VERSION_3, pointersInTestCheckpoint); + verify( + EventQueueBackingStoreFactory.get( + checkpoint, 10, "test", new FileChannelCounter("test"), true + ), + Serialization.VERSION_3, pointersInTestCheckpoint + ); } @Test public void testNoUprade() throws Exception { - verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false), - Serialization.VERSION_2, pointersInTestCheckpoint); + verify( + EventQueueBackingStoreFactory.get( + checkpoint, 10, "test", new FileChannelCounter("test"), false + ), + Serialization.VERSION_2, pointersInTestCheckpoint + ); } @Test(expected = BadCheckpointException.class) public void testDecreaseCapacity() throws Exception { Assert.assertTrue(checkpoint.delete()); EventQueueBackingStore backingStore = - EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test")); backingStore.close(); - EventQueueBackingStoreFactory.get(checkpoint, 9, "test"); + EventQueueBackingStoreFactory.get(checkpoint, 9, "test", new FileChannelCounter("test")); Assert.fail(); } @@ -105,17 +116,21 @@ public class TestEventQueueBackingStoreFactory { public void testIncreaseCapacity() throws Exception { Assert.assertTrue(checkpoint.delete()); EventQueueBackingStore backingStore = - EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test")); backingStore.close(); - EventQueueBackingStoreFactory.get(checkpoint, 11, "test"); + EventQueueBackingStoreFactory.get(checkpoint, 11, "test", new FileChannelCounter("test")); Assert.fail(); } @Test public void testNewCheckpoint() throws Exception { Assert.assertTrue(checkpoint.delete()); - verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false), - Serialization.VERSION_3, Collections.<Long>emptyList()); + verify( + EventQueueBackingStoreFactory.get( + checkpoint, 10, "test", new FileChannelCounter("test"), false + ), + Serialization.VERSION_3, Collections.<Long>emptyList() + ); } @Test(expected = BadCheckpointException.class) @@ -123,13 +138,15 @@ public class TestEventQueueBackingStoreFactory { RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); try { EventQueueBackingStore backingStore = - EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test")); backingStore.close(); writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * Serialization.SIZE_OF_LONG); writer.writeLong(94L); writer.getFD().sync(); - backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + backingStore = EventQueueBackingStoreFactory.get( + checkpoint, 10, "test", new FileChannelCounter("test") + ); } finally { writer.close(); } @@ -141,12 +158,14 @@ public class TestEventQueueBackingStoreFactory { try { EventQueueBackingStore backingStore = - EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test")); backingStore.close(); writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER * Serialization.SIZE_OF_LONG); writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE); writer.getFD().sync(); - backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + backingStore = EventQueueBackingStoreFactory.get( + checkpoint, 10, "test", new FileChannelCounter("test") + ); } finally { writer.close(); } @@ -157,12 +176,14 @@ public class TestEventQueueBackingStoreFactory { RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); try { EventQueueBackingStore backingStore = - EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test")); backingStore.close(); writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * Serialization.SIZE_OF_LONG); writer.writeLong(2L); writer.getFD().sync(); - backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + backingStore = EventQueueBackingStoreFactory.get( + checkpoint, 10, "test", new FileChannelCounter("test") + ); } finally { writer.close(); } @@ -173,7 +194,7 @@ public class TestEventQueueBackingStoreFactory { FileOutputStream os = null; try { EventQueueBackingStore backingStore = - EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test")); backingStore.close(); Assert.assertTrue(checkpoint.exists()); Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0); @@ -184,7 +205,9 @@ public class TestEventQueueBackingStoreFactory { os = new FileOutputStream(Serialization.getMetaDataFile(checkpoint)); meta.toBuilder().setVersion(2).build().writeDelimitedTo(os); os.flush(); - backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + backingStore = EventQueueBackingStoreFactory.get( + checkpoint, 10, "test", new FileChannelCounter("test") + ); } finally { os.close(); } @@ -195,12 +218,14 @@ public class TestEventQueueBackingStoreFactory { RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); try { EventQueueBackingStore backingStore = - EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test")); backingStore.close(); writer.seek(EventQueueBackingStoreFile.INDEX_WRITE_ORDER_ID * Serialization.SIZE_OF_LONG); writer.writeLong(2L); writer.getFD().sync(); - backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + backingStore = EventQueueBackingStoreFactory.get( + checkpoint, 10, "test", new FileChannelCounter("test") + ); } finally { writer.close(); } @@ -211,7 +236,7 @@ public class TestEventQueueBackingStoreFactory { FileOutputStream os = null; try { EventQueueBackingStore backingStore = - EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test")); backingStore.close(); Assert.assertTrue(checkpoint.exists()); Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0); @@ -223,7 +248,9 @@ public class TestEventQueueBackingStoreFactory { Serialization.getMetaDataFile(checkpoint)); meta.toBuilder().setWriteOrderID(1).build().writeDelimitedTo(os); os.flush(); - backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + backingStore = EventQueueBackingStoreFactory.get( + checkpoint, 10, "test", new FileChannelCounter("test") + ); } finally { os.close(); } @@ -232,7 +259,7 @@ public class TestEventQueueBackingStoreFactory { @Test(expected = BadCheckpointException.class) public void testTruncateMeta() throws Exception { EventQueueBackingStore backingStore = - EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test")); backingStore.close(); Assert.assertTrue(checkpoint.exists()); File metaFile = Serialization.getMetaDataFile(checkpoint); @@ -241,13 +268,15 @@ public class TestEventQueueBackingStoreFactory { writer.setLength(0); writer.getFD().sync(); writer.close(); - backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + backingStore = EventQueueBackingStoreFactory.get( + checkpoint, 10, "test", new FileChannelCounter("test") + ); } @Test(expected = InvalidProtocolBufferException.class) public void testCorruptMeta() throws Throwable { EventQueueBackingStore backingStore = - EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test")); backingStore.close(); Assert.assertTrue(checkpoint.exists()); File metaFile = Serialization.getMetaDataFile(checkpoint); @@ -258,7 +287,9 @@ public class TestEventQueueBackingStoreFactory { writer.getFD().sync(); writer.close(); try { - backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + backingStore = EventQueueBackingStoreFactory.get( + checkpoint, 10, "test", new FileChannelCounter("test") + ); } catch (BadCheckpointException ex) { throw ex.getCause(); } http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java index d0237db..e2d1ee6 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java @@ -31,7 +31,10 @@ import org.mockito.stubbing.Answer; import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertFalse; @@ -230,6 +233,70 @@ public class TestFileChannelErrorMetrics extends TestFileChannelBase { assertFalse(channel.getChannelCounter().isOpen()); } + @Test + public void testCheckpointBackupWriteErrorShouldIncreaseCounter() + throws IOException, InterruptedException { + FileChannelCounter fileChannelCounter = new FileChannelCounter("test"); + File checkpointFile = File.createTempFile("checkpoint", ".tmp"); + File backupDir = Files.createTempDirectory("checkpoint").toFile(); + backupDir.deleteOnExit(); + checkpointFile.deleteOnExit(); + EventQueueBackingStoreFileV3 backingStoreFileV3 = new EventQueueBackingStoreFileV3( + checkpointFile, 1, "test", fileChannelCounter, backupDir,true, false + ); + + // Exception will be thrown by state check if beforeCheckpoint is not called + backingStoreFileV3.checkpoint(); + // wait for other thread to reach the error state + assertEventuallyTrue("checkpoint backup write failure should increase counter to 1", + new BooleanPredicate() { + @Override + public boolean get() { + return fileChannelCounter.getCheckpointBackupWriteErrorCount() == 1; + } + }, + 100 + ); + } + + @Test + public void testCheckpointBackupWriteErrorShouldIncreaseCounter2() + throws Exception { + int checkpointInterval = 1500; + Map config = new HashMap(); + config.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, String.valueOf(checkpointInterval)); + config.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true"); + final FileChannel channel = createFileChannel(Collections.unmodifiableMap(config)); + channel.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody("test".getBytes())); + tx.commit(); + tx.close(); + final long beforeCheckpointWrite = System.currentTimeMillis(); + // first checkpoint should be written successfully -> the counter should remain 0 + assertEventuallyTrue("checkpoint backup should have been written", new BooleanPredicate() { + @Override + public boolean get() { + return new File(backupDir, "checkpoint").lastModified() > beforeCheckpointWrite; + } + }, checkpointInterval * 3); + assertEquals(0, channel.getChannelCounter().getCheckpointBackupWriteErrorCount()); + FileUtils.deleteDirectory(backupDir); + tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody("test2".getBytes())); + tx.commit(); + tx.close(); + // the backup directory has been deleted so the backup checkpoint write should have been failed + assertEventuallyTrue("checkpointBackupWriteErrorCount should be 1", new BooleanPredicate() { + @Override + public boolean get() { + return channel.getChannelCounter().getCheckpointBackupWriteErrorCount() >= 1; + } + }, checkpointInterval * 3); + } + private interface BooleanPredicate { boolean get(); } http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java index f1700f9..9c7352e 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; +import org.apache.flume.channel.file.instrumentation.FileChannelCounter; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -96,7 +97,7 @@ public class TestFlumeEventQueue { public EventQueueBackingStore get() throws Exception { Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs()); return new EventQueueBackingStoreFileV2(getCheckpoint(), 1000, - "test"); + "test", new FileChannelCounter("test")); } } }, @@ -105,7 +106,9 @@ public class TestFlumeEventQueue { @Override public EventQueueBackingStore get() throws Exception { Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs()); - return new EventQueueBackingStoreFileV3(getCheckpoint(), 1000, "test"); + return new EventQueueBackingStoreFileV3( + getCheckpoint(), 1000, "test", new FileChannelCounter("test") + ); } } } @@ -135,7 +138,9 @@ public class TestFlumeEventQueue { backingStore.close(); File checkpoint = backingStoreSupplier.getCheckpoint(); Assert.assertTrue(checkpoint.delete()); - backingStore = new EventQueueBackingStoreFileV2(checkpoint, 1, "test"); + backingStore = new EventQueueBackingStoreFileV2( + checkpoint, 1, "test", new FileChannelCounter("test") + ); queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), backingStoreSupplier.getInflightPuts(), @@ -149,7 +154,9 @@ public class TestFlumeEventQueue { backingStore.close(); File checkpoint = backingStoreSupplier.getCheckpoint(); Assert.assertTrue(checkpoint.delete()); - backingStore = new EventQueueBackingStoreFileV2(checkpoint, 0, "test"); + backingStore = new EventQueueBackingStoreFileV2( + checkpoint, 0, "test", new FileChannelCounter("test") + ); queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), backingStoreSupplier.getInflightPuts(), @@ -161,7 +168,9 @@ public class TestFlumeEventQueue { backingStore.close(); File checkpoint = backingStoreSupplier.getCheckpoint(); Assert.assertTrue(checkpoint.delete()); - backingStore = new EventQueueBackingStoreFileV2(checkpoint, -1, "test"); + backingStore = new EventQueueBackingStoreFileV2( + checkpoint, -1, "test", new FileChannelCounter("test") + ); queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), backingStoreSupplier.getInflightPuts(),
