Repository: flume Updated Branches: refs/heads/trunk ed433ae1b -> fdc53f338
FLUME-3092. Extend the FileChannel's monitoring metrics This patch adds the following new metrics to the FileChannel's counters: - eventPutErrorCount: incremented if an IOException occurs during put operation. - eventTakeErrorCount: incremented if an IOException or CorruptEventException occurs during take operation. - checkpointWriteErrorCount: incremented if an exception occurs during checkpoint write. - unhealthy: this flag represents whether the channel has started successfully (i.e. the replay ran without any problem), so the channel is capable for normal operation - closed flag: the numeric representation (1: closed, 0: open) of the negated open flag. Closes #131. Reviewers: Attila Simon, Mike Percy (Denes Arvay via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/fdc53f33 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/fdc53f33 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/fdc53f33 Branch: refs/heads/trunk Commit: fdc53f338931b96addf06f3f2be59da128656ec0 Parents: ed433ae Author: Denes Arvay <[email protected]> Authored: Tue May 9 16:23:31 2017 +0200 Committer: Mike Percy <[email protected]> Committed: Tue May 16 12:50:10 2017 -0700 ---------------------------------------------------------------------- .../apache/flume/channel/file/FileChannel.java | 55 +++-- .../java/org/apache/flume/channel/file/Log.java | 19 +- .../instrumentation/FileChannelCounter.java | 49 +++- .../FileChannelCounterMBean.java | 39 +++ .../flume/channel/file/TestFileChannelBase.java | 13 +- .../file/TestFileChannelErrorMetrics.java | 247 +++++++++++++++++++ .../org/apache/flume/channel/file/TestLog.java | 16 ++ 7 files changed, 412 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/fdc53f33/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index eca4620..3194592 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -39,7 +39,6 @@ import org.apache.flume.channel.file.Log.Builder; import org.apache.flume.channel.file.encryption.EncryptionConfiguration; import org.apache.flume.channel.file.encryption.KeyProvider; import org.apache.flume.channel.file.encryption.KeyProviderFactory; -import org.apache.flume.instrumentation.ChannelCounter; import org.apache.flume.channel.file.instrumentation.FileChannelCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -270,6 +269,7 @@ public class FileChannel extends BasicChannelSemantics { if (channelCounter == null) { channelCounter = new FileChannelCounter(getName()); } + channelCounter.setUnhealthy(0); } @Override @@ -277,25 +277,7 @@ public class FileChannel extends BasicChannelSemantics { LOG.info("Starting {}...", this); channelCounter.start(); try { - Builder builder = new Log.Builder(); - builder.setCheckpointInterval(checkpointInterval); - builder.setMaxFileSize(maxFileSize); - builder.setMinimumRequiredSpace(minimumRequiredSpace); - builder.setQueueSize(capacity); - builder.setCheckpointDir(checkpointDir); - builder.setLogDirs(dataDirs); - builder.setChannelName(getName()); - builder.setUseLogReplayV1(useLogReplayV1); - builder.setUseFastReplay(useFastReplay); - builder.setEncryptionKeyProvider(encryptionKeyProvider); - builder.setEncryptionKeyAlias(encryptionActiveKey); - builder.setEncryptionCipherProvider(encryptionCipherProvider); - builder.setUseDualCheckpoints(useDualCheckpoints); - builder.setCompressBackupCheckpoint(compressBackupCheckpoint); - builder.setBackupCheckpointDir(backupCheckpointDir); - builder.setFsyncPerTransaction(fsyncPerTransaction); - builder.setFsyncInterval(fsyncInterval); - builder.setCheckpointOnClose(checkpointOnClose); + Builder builder = createLogBuilder(); log = builder.build(); log.replay(); setOpen(true); @@ -307,6 +289,7 @@ public class FileChannel extends BasicChannelSemantics { + channelNameDescriptor); } catch (Throwable t) { setOpen(false); + channelCounter.setUnhealthy(1); startupError = t; LOG.error("Failed to start the file channel " + channelNameDescriptor, t); if (t instanceof Error) { @@ -320,6 +303,31 @@ public class FileChannel extends BasicChannelSemantics { super.start(); } + @VisibleForTesting + Builder createLogBuilder() { + Builder builder = new Log.Builder(); + builder.setCheckpointInterval(checkpointInterval); + builder.setMaxFileSize(maxFileSize); + builder.setMinimumRequiredSpace(minimumRequiredSpace); + builder.setQueueSize(capacity); + builder.setCheckpointDir(checkpointDir); + builder.setLogDirs(dataDirs); + builder.setChannelName(getName()); + builder.setUseLogReplayV1(useLogReplayV1); + builder.setUseFastReplay(useFastReplay); + builder.setEncryptionKeyProvider(encryptionKeyProvider); + builder.setEncryptionKeyAlias(encryptionActiveKey); + builder.setEncryptionCipherProvider(encryptionCipherProvider); + builder.setUseDualCheckpoints(useDualCheckpoints); + builder.setCompressBackupCheckpoint(compressBackupCheckpoint); + builder.setBackupCheckpointDir(backupCheckpointDir); + builder.setFsyncPerTransaction(fsyncPerTransaction); + builder.setFsyncInterval(fsyncInterval); + builder.setCheckpointOnClose(checkpointOnClose); + builder.setChannelCounter(channelCounter); + return builder; + } + @Override public synchronized void stop() { LOG.info("Stopping {}...", this); @@ -449,12 +457,12 @@ public class FileChannel extends BasicChannelSemantics { private final FlumeEventQueue queue; private final Semaphore queueRemaining; private final String channelNameDescriptor; - private final ChannelCounter channelCounter; + private final FileChannelCounter channelCounter; private final boolean fsyncPerTransaction; public FileBackedTransaction(Log log, long transactionID, int transCapacity, int keepAlive, Semaphore queueRemaining, - String name, boolean fsyncPerTransaction, ChannelCounter + String name, boolean fsyncPerTransaction, FileChannelCounter counter) { this.log = log; queue = log.getFlumeEventQueue(); @@ -503,6 +511,7 @@ public class FileChannel extends BasicChannelSemantics { queue.addWithoutCommit(ptr, transactionID); success = true; } catch (IOException e) { + channelCounter.incrementEventPutErrorCount(); throw new ChannelException("Put failed due to IO error " + channelNameDescriptor, e); } finally { @@ -549,6 +558,7 @@ public class FileChannel extends BasicChannelSemantics { Event event = log.get(ptr); return event; } catch (IOException e) { + channelCounter.incrementEventTakeErrorCount(); throw new ChannelException("Take failed due to IO error " + channelNameDescriptor, e); } catch (NoopRecordException e) { @@ -556,6 +566,7 @@ public class FileChannel extends BasicChannelSemantics { "tool found. Will retrieve next event", e); takeList.remove(ptr); } catch (CorruptEventException ex) { + channelCounter.incrementEventTakeErrorCount(); if (fsyncPerTransaction) { throw new ChannelException(ex); } http://git-wip-us.apache.org/repos/asf/flume/blob/fdc53f33/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 5f59d97..1662a5b 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -30,6 +30,7 @@ import org.apache.flume.Event; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.channel.file.encryption.KeyProvider; +import org.apache.flume.channel.file.instrumentation.FileChannelCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,6 +139,8 @@ public class Log { private final List<File> pendingDeletes = Lists.newArrayList(); + private final FileChannelCounter channelCounter; + static class Builder { private long bCheckpointInterval; private long bMinimumRequiredSpace; @@ -161,6 +164,8 @@ public class Log { private boolean checkpointOnClose = true; + private FileChannelCounter channelCounter; + boolean isFsyncPerTransaction() { return fsyncPerTransaction; } @@ -262,13 +267,18 @@ public class Log { return this; } + Builder setChannelCounter(FileChannelCounter channelCounter) { + this.channelCounter = channelCounter; + return this; + } + Log build() throws IOException { return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity, bUseDualCheckpoints, bCompressBackupCheckpoint, bCheckpointDir, bBackupCheckpointDir, bName, useLogReplayV1, useFastReplay, bMinimumRequiredSpace, bEncryptionKeyProvider, bEncryptionKeyAlias, bEncryptionCipherProvider, bUsableSpaceRefreshInterval, - fsyncPerTransaction, fsyncInterval, checkpointOnClose, bLogDirs); + fsyncPerTransaction, fsyncInterval, checkpointOnClose, channelCounter, bLogDirs); } } @@ -280,7 +290,8 @@ public class Log { @Nullable String encryptionKeyAlias, @Nullable String encryptionCipherProvider, long usableSpaceRefreshInterval, boolean fsyncPerTransaction, - int fsyncInterval, boolean checkpointOnClose, File... logDirs) + int fsyncInterval, boolean checkpointOnClose, FileChannelCounter channelCounter, + File... logDirs) throws IOException { Preconditions.checkArgument(checkpointInterval > 0, "checkpointInterval <= 0"); @@ -304,6 +315,7 @@ public class Log { Preconditions.checkArgument(logDirs.length > 0, "logDirs empty"); Preconditions.checkArgument(name != null && !name.trim().isEmpty(), "channel name should be specified"); + Preconditions.checkNotNull(channelCounter, "ChannelCounter must be not null"); this.channelNameDescriptor = "[channel=" + name + "]"; this.useLogReplayV1 = useLogReplayV1; @@ -361,6 +373,7 @@ public class Log { this.fsyncPerTransaction = fsyncPerTransaction; this.fsyncInterval = fsyncInterval; this.checkpointOnClose = checkpointOnClose; + this.channelCounter = channelCounter; logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length); workerExecutor = Executors.newSingleThreadScheduledExecutor(new @@ -1221,8 +1234,10 @@ public class Log { log.writeCheckpoint(); } } catch (IOException e) { + log.channelCounter.incrementCheckpointWriteErrorCount(); LOG.error("Error doing checkpoint", e); } catch (Throwable e) { + log.channelCounter.incrementCheckpointWriteErrorCount(); LOG.error("General error in checkpoint worker", e); } } http://git-wip-us.apache.org/repos/asf/flume/blob/fdc53f33/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java index 1cd1ba8..40470a8 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java @@ -23,9 +23,15 @@ import org.apache.flume.instrumentation.ChannelCounter; public class FileChannelCounter extends ChannelCounter implements FileChannelCounterMBean { private boolean open; + private int unhealthy; + + private static final String EVENT_PUT_ERROR_COUNT = "channel.file.event.put.error"; + private static final String EVENT_TAKE_ERROR_COUNT = "channel.file.event.take.error"; + private static final String CHECKPOINT_WRITE_ERROR_COUNT = "channel.file.checkpoint.write.error"; public FileChannelCounter(String name) { - super(name); + super(name, new String[] { + EVENT_PUT_ERROR_COUNT, EVENT_TAKE_ERROR_COUNT, CHECKPOINT_WRITE_ERROR_COUNT }); } @Override @@ -36,4 +42,45 @@ public class FileChannelCounter extends ChannelCounter implements FileChannelCou public void setOpen(boolean open) { this.open = open; } + + @Override + public int getClosed() { + return open ? 0 : 1; + } + + @Override + public int getUnhealthy() { + return unhealthy; + } + + public void setUnhealthy(int unhealthy) { + this.unhealthy = unhealthy; + } + + @Override + public long getEventPutErrorCount() { + return get(EVENT_PUT_ERROR_COUNT); + } + + public void incrementEventPutErrorCount() { + increment(EVENT_PUT_ERROR_COUNT); + } + + @Override + public long getEventTakeErrorCount() { + return get(EVENT_TAKE_ERROR_COUNT); + } + + public void incrementEventTakeErrorCount() { + increment(EVENT_TAKE_ERROR_COUNT); + } + + @Override + public long getCheckpointWriteErrorCount() { + return get(CHECKPOINT_WRITE_ERROR_COUNT); + } + + public void incrementCheckpointWriteErrorCount() { + increment(CHECKPOINT_WRITE_ERROR_COUNT); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/fdc53f33/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java index a193c0c..175b1f4 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java @@ -18,9 +18,48 @@ */ package org.apache.flume.channel.file.instrumentation; +import org.apache.flume.Event; import org.apache.flume.instrumentation.ChannelCounterMBean; public interface FileChannelCounterMBean extends ChannelCounterMBean { boolean isOpen(); + + /** + * The numeric representation (0/1) of the negated value of the open flag. + */ + int getClosed(); + + /** + * A value of 0 represents that the channel is in a healthy state: it is either starting + * up (i.e. the replay is running) or already started up successfully. + * A value of 1 represents that the channel is in a permanently failed state, which means that + * the startup was unsuccessful due to an exception during the replay. + * Once the channel started up successfully the *ErrorCount (or the ratio of the *AttemptCount + * and *SuccessCount) counters should be used to check whether it is functioning properly. + * + * Note: this flag doesn't report the channel as unhealthy if the configuration failed because the + * ChannelCounter might not have been instantiated/started yet. + */ + int getUnhealthy(); + + /** + * A count of the number of IOExceptions encountered while trying to put() onto the channel. + * @see org.apache.flume.channel.file.FileChannel.FileBackedTransaction#doPut(Event) + */ + long getEventPutErrorCount(); + + /** + * A count of the number of errors encountered while trying to take() from the channel, + * including IOExceptions and corruption-related errors. + * @see org.apache.flume.channel.file.FileChannel.FileBackedTransaction#doTake() + */ + long getEventTakeErrorCount(); + + /** + * A count of the number of errors encountered while trying to write the checkpoints. This + * includes any Throwables. + * @see org.apache.flume.channel.file.Log.BackgroundWorker#run() + */ + long getCheckpointWriteErrorCount(); } http://git-wip-us.apache.org/repos/asf/flume/blob/fdc53f33/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java index 9901b69..6f5981a 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java @@ -22,6 +22,7 @@ import java.io.File; import java.util.HashMap; import java.util.Map; +import com.google.common.base.Preconditions; import org.apache.commons.io.FileUtils; import org.apache.flume.Context; import org.junit.After; @@ -32,6 +33,7 @@ import com.google.common.io.Files; public class TestFileChannelBase { + private final int dataDirCount; protected FileChannel channel; protected File baseDir; protected File checkpointDir; @@ -41,6 +43,15 @@ public class TestFileChannelBase { protected File uncompressedBackupCheckpoint; protected File compressedBackupCheckpoint; + public TestFileChannelBase() { + this(3); // By default the tests run with multiple data directories + } + + public TestFileChannelBase(int dataDirCount) { + Preconditions.checkArgument(dataDirCount > 0, "Invalid dataDirCount"); + this.dataDirCount = dataDirCount; + } + @Before public void setup() throws Exception { baseDir = Files.createTempDir(); @@ -51,7 +62,7 @@ public class TestFileChannelBase { "checkpoint.snappy"); Assert.assertTrue(checkpointDir.mkdirs() || checkpointDir.isDirectory()); Assert.assertTrue(backupDir.mkdirs() || backupDir.isDirectory()); - dataDirs = new File[3]; + dataDirs = new File[dataDirCount]; dataDir = ""; for (int i = 0; i < dataDirs.length; i++) { dataDirs[i] = new File(baseDir, "data" + (i + 1)); http://git-wip-us.apache.org/repos/asf/flume/blob/fdc53f33/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java new file mode 100644 index 0000000..d0237db --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import junit.framework.Assert; +import org.apache.commons.io.FileUtils; +import org.apache.flume.ChannelException; +import org.apache.flume.Transaction; +import org.apache.flume.channel.file.instrumentation.FileChannelCounter; +import org.apache.flume.event.EventBuilder; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; + +public class TestFileChannelErrorMetrics extends TestFileChannelBase { + + public TestFileChannelErrorMetrics() { + // use only 1 data directory in order to make it simpler to edit the data files + // in testCorruptEventTaken() and testUnhealthy() methods + super(1); + } + + /** + * This tests multiple successful and failed put and take operations + * and checks the values of the channel's counters. + */ + @Test + public void testEventTakePutErrorCount() throws Exception { + final long usableSpaceRefreshInterval = 1; + FileChannel channel = Mockito.spy(createFileChannel()); + Mockito.when(channel.createLogBuilder()).then(new Answer<Log.Builder>() { + @Override + public Log.Builder answer(InvocationOnMock invocation) throws Throwable { + Log.Builder ret = (Log.Builder) invocation.callRealMethod(); + ret.setUsableSpaceRefreshInterval(usableSpaceRefreshInterval); + return ret; + } + }); + channel.start(); + + FileChannelCounter channelCounter = channel.getChannelCounter(); + + Transaction tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody("test1".getBytes())); + channel.put(EventBuilder.withBody("test2".getBytes())); + tx.commit(); + tx.close(); + assertEquals(2, channelCounter.getEventPutAttemptCount()); + assertEquals(2, channelCounter.getEventPutSuccessCount()); + assertEquals(0, channelCounter.getEventPutErrorCount()); + + tx = channel.getTransaction(); + tx.begin(); + channel.take(); + tx.commit(); + tx.close(); + assertEquals(1, channelCounter.getEventTakeAttemptCount()); + assertEquals(1, channelCounter.getEventTakeSuccessCount()); + assertEquals(0, channelCounter.getEventTakeErrorCount()); + + FileUtils.deleteDirectory(baseDir); + Thread.sleep(2 * usableSpaceRefreshInterval); + + tx = channel.getTransaction(); + tx.begin(); + ChannelException putException = null; + try { + channel.put(EventBuilder.withBody("test".getBytes())); + } catch (ChannelException ex) { + putException = ex; + } + assertNotNull(putException); + assertTrue(putException.getCause() instanceof IOException); + assertEquals(3, channelCounter.getEventPutAttemptCount()); + assertEquals(2, channelCounter.getEventPutSuccessCount()); + assertEquals(1, channelCounter.getEventPutErrorCount()); + + ChannelException takeException = null; + try { + channel.take(); // This is guaranteed to throw an error if the above put() threw an error. + } catch (ChannelException ex) { + takeException = ex; + } + assertNotNull(takeException); + assertTrue(takeException.getCause() instanceof IOException); + assertEquals(2, channelCounter.getEventTakeAttemptCount()); + assertEquals(1, channelCounter.getEventTakeSuccessCount()); + assertEquals(1, channelCounter.getEventTakeErrorCount()); + } + + /** + * Test the FileChannelCounter.eventTakeErrorCount value if the data file + * contains an invalid record thus CorruptEventException is thrown during + * the take() operation. + * The first byte of the record (= the first byte of the file in this case) + * is the operation byte, changing it to an unexpected value will cause the + * CorruptEventException to be thrown. + */ + @Test + public void testCorruptEventTaken() throws Exception { + FileChannel channel = createFileChannel( + Collections.singletonMap(FileChannelConfiguration.FSYNC_PER_TXN, "false")); + channel.start(); + + FileChannelCounter channelCounter = channel.getChannelCounter(); + + Transaction tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody("test".getBytes())); + tx.commit(); + tx.close(); + + byte[] data = FileUtils.readFileToByteArray(new File(dataDirs[0], "log-1")); + data[0] = LogFile.OP_EOF; // change the first (operation) byte to unexpected value + FileUtils.writeByteArrayToFile(new File(dataDirs[0], "log-1"), data); + + tx = channel.getTransaction(); + tx.begin(); + + try { + channel.take(); + } catch (Throwable t) { + // If fsyncPerTransaction is false then Log.get throws the CorruptEventException + // without wrapping it to IOException (which is the case when fsyncPerTransaciton is true) + // but in this case it is swallowed in FileBackedTransaction.doTake() + // The eventTakeErrorCount should be increased regardless of this. + Assert.fail("No exception should be thrown as fsyncPerTransaction is false"); + } + + assertEquals(1, channelCounter.getEventTakeAttemptCount()); + assertEquals(0, channelCounter.getEventTakeSuccessCount()); + assertEquals(1, channelCounter.getEventTakeErrorCount()); + } + + @Test + public void testCheckpointWriteErrorCount() throws Exception { + int checkpointInterval = 1500; + final FileChannel channel = createFileChannel(Collections.singletonMap( + FileChannelConfiguration.CHECKPOINT_INTERVAL, String.valueOf(checkpointInterval))); + channel.start(); + + Transaction tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody("test".getBytes())); + tx.commit(); + tx.close(); + + final long beforeCheckpointWrite = System.currentTimeMillis(); + + // first checkpoint should be written successfully -> the counter should remain 0 + assertEventuallyTrue("checkpoint should have been written", new BooleanPredicate() { + @Override + public boolean get() { + return new File(checkpointDir, "checkpoint").lastModified() > beforeCheckpointWrite; + } + }, checkpointInterval * 3); + assertEquals(0, channel.getChannelCounter().getCheckpointWriteErrorCount()); + + FileUtils.deleteDirectory(baseDir); + + // the channel's directory has been deleted so the checkpoint write should have been failed + assertEventuallyTrue("checkpointWriterErrorCount should be 1", new BooleanPredicate() { + @Override + public boolean get() { + return channel.getChannelCounter().getCheckpointWriteErrorCount() == 1; + } + }, checkpointInterval * 3); + } + + /** + * Test the value of the FileChannelCounter.unhealthy flag after normal startup. + * It is expected to be 0 + */ + @Test + public void testHealthy() throws Exception { + FileChannel channel = createFileChannel(); + assertEquals(0, channel.getChannelCounter().getUnhealthy()); + assertEquals(1, channel.getChannelCounter().getClosed()); + assertFalse(channel.getChannelCounter().isOpen()); + + channel.start(); + assertEquals(0, channel.getChannelCounter().getUnhealthy()); + assertEquals(0, channel.getChannelCounter().getClosed()); + assertTrue(channel.getChannelCounter().isOpen()); + } + + /** + * Test the value of the FileChannelCounter.unhealthy flag after a failed startup. + * It is expected to be 1 + */ + @Test + public void testUnhealthy() throws Exception { + FileChannel channel = createFileChannel(); + assertEquals(0, channel.getChannelCounter().getUnhealthy()); + assertEquals(1, channel.getChannelCounter().getClosed()); + assertFalse(channel.getChannelCounter().isOpen()); + + FileUtils.write(new File(dataDirs[0], "log-1"), "invalid data file content"); + + channel.start(); + assertEquals(1, channel.getChannelCounter().getUnhealthy()); + assertEquals(1, channel.getChannelCounter().getClosed()); + assertFalse(channel.getChannelCounter().isOpen()); + } + + private interface BooleanPredicate { + boolean get(); + } + + private static void assertEventuallyTrue(String description, BooleanPredicate expression, + long timeoutMillis) + throws InterruptedException { + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() < start + timeoutMillis) { + if (expression.get()) break; + Thread.sleep(timeoutMillis / 10); + } + assertTrue(description, expression.get()); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/fdc53f33/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java index f7f0950..e85274d 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java @@ -21,6 +21,7 @@ package org.apache.flume.channel.file; import com.google.common.collect.Lists; import com.google.common.io.Files; import org.apache.commons.io.FileUtils; +import org.apache.flume.channel.file.instrumentation.FileChannelCounter; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -63,6 +64,7 @@ public class TestLog { .setLogDirs(dataDirs) .setCheckpointOnClose(false) .setChannelName("testlog") + .setChannelCounter(new FileChannelCounter("testlog")) .build(); log.replay(); } @@ -141,6 +143,7 @@ public class TestLog { .setCheckpointDir(checkpointDir) .setLogDirs(dataDirs) .setChannelName("testlog") + .setChannelCounter(new FileChannelCounter("testlog")) .build(); log.replay(); takeAndVerify(eventPointerIn, eventIn); @@ -163,6 +166,7 @@ public class TestLog { .setCheckpointDir(checkpointDir) .setLogDirs(dataDirs) .setChannelName("testlog") + .setChannelCounter(new FileChannelCounter("testlog")) .build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); @@ -180,6 +184,7 @@ public class TestLog { .setLogDirs(dataDirs) .setChannelName("testlog") .setMinimumRequiredSpace(Long.MAX_VALUE) + .setChannelCounter(new FileChannelCounter("testlog")) .build(); try { log.replay(); @@ -219,6 +224,7 @@ public class TestLog { .setChannelName("testlog") .setMinimumRequiredSpace(minimumRequiredSpace) .setUsableSpaceRefreshInterval(1L) + .setChannelCounter(new FileChannelCounter("testlog")) .build(); log.replay(); File filler = new File(checkpointDir, "filler"); @@ -259,6 +265,7 @@ public class TestLog { .setCheckpointDir(checkpointDir) .setLogDirs(dataDirs) .setChannelName("testlog") + .setChannelCounter(new FileChannelCounter("testlog")) .build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); @@ -298,6 +305,7 @@ public class TestLog { .setLogDirs(dataDirs) .setChannelName("testlog") .setUseLogReplayV1(useLogReplayV1) + .setChannelCounter(new FileChannelCounter("testlog")) .build(); log.replay(); takeAndVerify(eventPointerIn, eventIn); @@ -314,6 +322,7 @@ public class TestLog { .setCheckpointDir(checkpointDir) .setLogDirs(dataDirs) .setChannelName("testlog") + .setChannelCounter(new FileChannelCounter("testlog")) .build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); @@ -332,6 +341,7 @@ public class TestLog { .setCheckpointDir(checkpointDir) .setLogDirs(dataDirs) .setChannelName("testlog") + .setChannelCounter(new FileChannelCounter("testlog")) .build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); @@ -350,6 +360,7 @@ public class TestLog { .setCheckpointDir(checkpointDir) .setLogDirs(dataDirs) .setChannelName("testlog") + .setChannelCounter(new FileChannelCounter("testlog")) .build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); @@ -402,6 +413,7 @@ public class TestLog { .setLogDirs(dataDirs) .setChannelName("testlog") .setUseFastReplay(useFastReplay) + .setChannelCounter(new FileChannelCounter("testlog")) .build(); log.replay(); FlumeEvent eventIn = TestUtils.newPersistableEvent(); @@ -430,6 +442,7 @@ public class TestLog { .setLogDirs(dataDirs) .setChannelName("testlog") .setUseFastReplay(useFastReplay) + .setChannelCounter(new FileChannelCounter("testlog")) .build(); try { log.replay(); @@ -455,6 +468,7 @@ public class TestLog { .setCheckpointDir(checkpointDir) .setLogDirs(dataDirs) .setChannelName("testlog") + .setChannelCounter(new FileChannelCounter("testlog")) .build(); doTestReplaySucceedsWithUnusedEmptyLogMetaData(eventIn, eventPointer); } @@ -477,6 +491,7 @@ public class TestLog { .setLogDirs(dataDirs) .setChannelName("testlog") .setUseFastReplay(true) + .setChannelCounter(new FileChannelCounter("testlog")) .build(); doTestReplaySucceedsWithUnusedEmptyLogMetaData(eventIn, eventPointer); } @@ -529,6 +544,7 @@ public class TestLog { .setLogDirs(dataDirs) .setCheckpointOnClose(true) .setChannelName("testLog") + .setChannelCounter(new FileChannelCounter("testlog")) .build(); log.replay();
