Updated Branches: refs/heads/trunk 59f613b85 -> e4950a6d4
FLUME-1599. Improve FileChannel error messages and javadocs. (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e4950a6d Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e4950a6d Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e4950a6d Branch: refs/heads/trunk Commit: e4950a6d4dfe937436399c4527db08e276474168 Parents: 59f613b Author: Hari Shreedharan <[email protected]> Authored: Mon Sep 24 12:14:08 2012 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Mon Sep 24 12:14:08 2012 -0700 ---------------------------------------------------------------------- .../org/apache/flume/channel/file/FileChannel.java | 46 +++++---------- .../channel/file/FileChannelConfiguration.java | 5 +- .../java/org/apache/flume/channel/file/Log.java | 7 +- 3 files changed, 22 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/e4950a6d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index c2e904b..09a2a18 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -51,21 +51,7 @@ import com.google.common.base.Strings; * <p> * FileChannel works by writing all transactions to a set of directories * specified in the configuration. Additionally, when a commit occurs - * the transaction is synced to disk. Pointers to events put on the - * channel are stored in memory. As such, each event on the queue - * will require 8 bytes of DirectMemory (non-heap). However, the channel - * will only allow a configurable number messages into the channel. - * The appropriate amount of direct memory for said capacity, - * must be allocated to the JVM via the JVM property: -XX:MaxDirectMemorySize - * </p> - * <br> - * <p> - * Memory Consumption: - * <ol> - * <li>200GB of data in queue at 100 byte messages: 16GB</li> - * <li>200GB of data in queue at 500 byte messages: 3.2GB</li> - * <li>200GB of data in queue at 1000 byte messages: 1.6GB</li> - * </ol> + * the transaction is synced to disk. * </p> */ public class FileChannel extends BasicChannelSemantics { @@ -408,9 +394,9 @@ public class FileChannel extends BasicChannelSemantics { boolean lockAcquired = log.tryLockShared(); try { if(!lockAcquired) { - throw new ChannelException("Failed to obtain lock for writing to the log. " - + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0. " + channelNameDescriptor); + throw new ChannelException("Failed to obtain lock for writing to the " + + "log. Try increasing the log write timeout value. " + + channelNameDescriptor); } FlumeEventPointer ptr = log.put(transactionID, event); Preconditions.checkState(putList.offer(ptr), "putList offer failed " @@ -442,9 +428,9 @@ public class FileChannel extends BasicChannelSemantics { + channelNameDescriptor); } if(!log.tryLockShared()) { - throw new ChannelException("Failed to obtain lock for writing to the log. " - + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0. " + channelNameDescriptor); + throw new ChannelException("Failed to obtain lock for writing to the " + + "log. Try increasing the log write timeout value. " + + channelNameDescriptor); } try { FlumeEventPointer ptr = queue.removeHead(transactionID); @@ -475,9 +461,9 @@ public class FileChannel extends BasicChannelSemantics { Preconditions.checkState(takes == 0, "nonzero puts and takes " + channelNameDescriptor); if(!log.tryLockShared()) { - throw new ChannelException("Failed to obtain lock for writing to the log. " - + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0. " + channelNameDescriptor); + throw new ChannelException("Failed to obtain lock for writing to the " + + "log. Try increasing the log write timeout value. " + + channelNameDescriptor); } try { log.commitPut(transactionID); @@ -507,9 +493,9 @@ public class FileChannel extends BasicChannelSemantics { } else if (takes > 0) { if(!log.tryLockShared()) { - throw new ChannelException("Failed to obtain lock for writing to the log. " - + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0. " + channelNameDescriptor); + throw new ChannelException("Failed to obtain lock for writing to the " + + "log. Try increasing the log write timeout value. " + + channelNameDescriptor); } try { log.commitTake(transactionID); @@ -534,9 +520,9 @@ public class FileChannel extends BasicChannelSemantics { boolean lockAcquired = log.tryLockShared(); try { if(!lockAcquired) { - throw new ChannelException("Failed to obtain lock for writing to the log. " - + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0. " + channelNameDescriptor); + throw new ChannelException("Failed to obtain lock for writing to the " + + "log. Try increasing the log write timeout value. " + + channelNameDescriptor); } log.rollback(transactionID); if(takes > 0) { http://git-wip-us.apache.org/repos/asf/flume/blob/e4950a6d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java index be2f633..92cad77 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java @@ -46,9 +46,8 @@ public class FileChannelConfiguration { public static final long DEFAULT_MAX_FILE_SIZE = Integer.MAX_VALUE - (500L * 1024L * 1024L); // ~1.52 G /** - * Maximum capacity of the channel. This number needs to be configured - * in line with -XX:MaxDirectMemorySize. {@link FileChannel} - * Default: 1,000,000 which will consume 8MB of direct memory + * Maximum capacity of the channel. + * Default: 1,000,000 */ public static final String CAPACITY = "capacity"; public static final int DEFAULT_CAPACITY = 1000000; http://git-wip-us.apache.org/repos/asf/flume/blob/e4950a6d/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 1072259..64725dd 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -42,6 +42,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import javax.annotation.Nullable; +import org.apache.flume.ChannelException; import org.apache.flume.Event; import org.apache.flume.channel.file.encryption.KeyProvider; import org.slf4j.Logger; @@ -710,9 +711,9 @@ class Log { private synchronized void roll(int index, ByteBuffer buffer) throws IOException { if (!tryLockShared()) { - throw new IOException("Failed to obtain lock for writing to the log. " - + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0. "+ channelNameDescriptor); + throw new ChannelException("Failed to obtain lock for writing to the " + + "log. Try increasing the log write timeout value. " + + channelNameDescriptor); } try {
