FLUME-1571: Channels should check for positive capacity and transaction capacity values
(Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/09dfc2ab Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/09dfc2ab Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/09dfc2ab Branch: refs/heads/flume-1.3.0 Commit: 09dfc2ab77ab898fd38094b3c8c2358f7fd55ffd Parents: 16b9cb5 Author: Brock Noland <[email protected]> Authored: Tue Dec 11 14:10:15 2012 -0600 Committer: Hari Shreedharan <[email protected]> Committed: Thu Dec 20 00:13:43 2012 -0800 ---------------------------------------------------------------------- .../org/apache/flume/channel/file/FileChannel.java | 33 +++++++++++++-- .../channel/file/TestCheckpointRebuilder.java | 2 + .../apache/flume/channel/file/TestFileChannel.java | 30 +++++++++++++ .../file/TestFileChannelFormatRegression.java | 2 + .../flume/channel/file/TestFileChannelRestart.java | 1 + .../org/apache/flume/channel/file/TestUtils.java | 16 ++++++- .../file/encryption/TestFileChannelEncryption.java | 6 +++ .../org/apache/flume/channel/MemoryChannel.java | 18 ++++++++- .../apache/flume/channel/TestMemoryChannel.java | 22 ++++++++++ 9 files changed, 122 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index 4bf480b..19c91b0 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -72,10 +72,10 @@ public class FileChannel extends BasicChannelSemantics { private static final Logger LOG = LoggerFactory .getLogger(FileChannel.class); - private int capacity; + private Integer capacity = 0; private int keepAlive; - private int transactionCapacity; - private long checkpointInterval; + private Integer transactionCapacity = 0; + private Long checkpointInterval = 0L; private long maxFileSize; private long minimumRequiredSpace; private File checkpointDir; @@ -147,6 +147,11 @@ public class FileChannel extends BasicChannelSemantics { int newCapacity = context.getInteger(FileChannelConfiguration.CAPACITY, FileChannelConfiguration.DEFAULT_CAPACITY); + if(newCapacity <= 0 && capacity == 0) { + newCapacity = FileChannelConfiguration.DEFAULT_CAPACITY; + LOG.warn("Invalid capacity specified, initializing channel to " + + "default capacity of {}", newCapacity); + } if(capacity > 0 && newCapacity != capacity) { LOG.warn("Capacity of this channel cannot be sized on the fly due " + "the requirement we have enough DirectMemory for the queue and " + @@ -163,9 +168,29 @@ public class FileChannel extends BasicChannelSemantics { context.getInteger(FileChannelConfiguration.TRANSACTION_CAPACITY, FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY); + if(transactionCapacity <= 0) { + transactionCapacity = + FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY; + LOG.warn("Invalid transaction capacity specified, " + + "initializing channel to default " + + "capacity of {}", transactionCapacity); + } + + Preconditions.checkState(transactionCapacity <= capacity, + "File Channel transaction capacity cannot be greater than the " + + "capacity of the channel."); + checkpointInterval = - context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL, + context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL, FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL); + if (checkpointInterval <= 0) { + LOG.warn("Checkpoint interval is invalid: " + checkpointInterval + + ", using default: " + + FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL); + + checkpointInterval = + FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL; + } // cannot be over FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE maxFileSize = Math.min( http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java index ffc4623..536af54 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java @@ -52,6 +52,8 @@ public class TestCheckpointRebuilder extends TestFileChannelBase { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(50)); + overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, + String.valueOf(50)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index 87a0a3f..0f7d14d 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -67,6 +67,28 @@ public class TestFileChannel extends TestFileChannelBase { public void teardown() { super.teardown(); } + + @Test + public void testNegativeCapacities() { + Map<String, String> parms = Maps.newHashMap(); + parms.put(FileChannelConfiguration.CAPACITY, "-3"); + parms.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "-1"); + parms.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "-2"); + FileChannel channel = createFileChannel(parms); + + Assert.assertTrue(field("capacity") + .ofType(Integer.class) + .in(channel).get() > 0); + + Assert.assertTrue(field("transactionCapacity") + .ofType(Integer.class) + .in(channel).get() > 0); + + Assert.assertTrue(field("checkpointInterval") + .ofType(Long.class) + .in(channel).get() > 0); + } + @Test public void testFailAfterTakeBeforeCommit() throws Throwable { final FileChannel channel = createFileChannel(); @@ -223,6 +245,8 @@ public class TestFileChannel extends TestFileChannelBase { public void testCapacity() throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(5)); + overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, + String.valueOf(5)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); @@ -259,6 +283,8 @@ public class TestFileChannel extends TestFileChannelBase { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.KEEP_ALIVE, String.valueOf(10L)); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10)); + overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, + String.valueOf(10)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); @@ -409,6 +435,8 @@ public class TestFileChannel extends TestFileChannelBase { public void testPutForceCheckpointCommitReplay() throws Exception{ Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(2)); + overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, + String.valueOf(2)); overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000"); FileChannel channel = createFileChannel(overrides); channel.start(); @@ -433,6 +461,8 @@ public class TestFileChannel extends TestFileChannelBase { public void testPutCheckpointCommitCheckpointReplay() throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(2)); + overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, + String.valueOf(2)); overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000"); FileChannel channel = createFileChannel(overrides); channel.start(); http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java index 184f956..c95122b 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java @@ -65,6 +65,8 @@ public class TestFileChannelFormatRegression extends TestFileChannelBase { } Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10)); + overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, + String.valueOf(10)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java index f548f31..3d5bf59 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java @@ -371,6 +371,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CAPACITY, "10"); + overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10"); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java index 9978f86..ba653e6 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java @@ -142,7 +142,15 @@ public class TestUtils { try { transaction.begin(); for (int j = 0; j < batchSize; j++) { - Event event = channel.take(); + Event event = null; + try { + event = channel.take(); + } catch (ChannelException ex) { + Assert.assertTrue(ex.getMessage().startsWith( + "Take list for FileBackedTransaction, capacity")); + transaction.commit(); + return result; + } if (event == null) { transaction.commit(); return result; @@ -194,11 +202,13 @@ public class TestUtils { result.addAll(batch); } } catch (ChannelException e) { - Assert.assertEquals("The channel has reached it's capacity. This might " + Assert.assertTrue(("The channel has reached it's capacity. This might " + "be the result of a sink on the channel having too low of batch " + "size, a downstream system running slower than normal, or that " + "the channel capacity is just too low. [channel=" - + channel.getName()+"]", e.getMessage()); + + channel.getName() + "]").equals(e.getMessage()) + || e.getMessage().startsWith("Put queue for FileBackedTransaction " + + "of capacity ")); } } return result; http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java index d2f5208..6ea1216 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java @@ -72,6 +72,8 @@ public class TestFileChannelEncryption extends TestFileChannelBase { private Map<String, String> getOverrides() throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(100)); + overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, + String.valueOf(100)); return overrides; } private Map<String, String> getOverridesForEncryption() throws Exception { @@ -98,6 +100,8 @@ public class TestFileChannelEncryption extends TestFileChannelBase { int numThreads = 20; Map<String, String> overrides = getOverridesForEncryption(); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000)); + overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, + String.valueOf(100)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); @@ -133,6 +137,8 @@ public class TestFileChannelEncryption extends TestFileChannelBase { int numThreads = 20; Map<String, String> overrides = getOverridesForEncryption(); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000)); + overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, + String.valueOf(100)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java index dfc289e..a25e639 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java @@ -224,15 +224,31 @@ public class MemoryChannel extends BasicChannelSemantics { capacity = context.getInteger("capacity", defaultCapacity); } catch(NumberFormatException e) { capacity = defaultCapacity; + LOGGER.warn("Invalid capacity specified, initializing channel to " + + "default capacity of {}", defaultCapacity); } + if (capacity <= 0) { + capacity = defaultCapacity; + LOGGER.warn("Invalid capacity specified, initializing channel to " + + "default capacity of {}", defaultCapacity); + } try { transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity); } catch(NumberFormatException e) { transCapacity = defaultTransCapacity; + LOGGER.warn("Invalid transation capacity specified, initializing channel" + + " to default capacity of {}", defaultTransCapacity); } - Preconditions.checkState(transCapacity <= capacity); + if (transCapacity <= 0) { + transCapacity = defaultTransCapacity; + LOGGER.warn("Invalid transation capacity specified, initializing channel" + + " to default capacity of {}", defaultTransCapacity); + } + Preconditions.checkState(transCapacity <= capacity, + "Transaction Capacity of Memory Channel cannot be higher than " + + "the capacity."); try { byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage); http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java index e1a61c2..a78581a 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java @@ -21,6 +21,7 @@ package org.apache.flume.channel; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.LinkedBlockingDeque; import org.apache.flume.Channel; import org.apache.flume.ChannelException; @@ -33,6 +34,8 @@ import org.apache.flume.event.EventBuilder; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.fest.reflect.core.Reflection.*; + public class TestMemoryChannel { @@ -439,4 +442,23 @@ public class TestMemoryChannel { } + + @Test + public void testNegativeCapacities() { + Context context = new Context(); + Map<String, String> parms = new HashMap<String, String>(); + parms.put("capacity", "-3"); + parms.put("transactionCapacity", "-1"); + context.putAll(parms); + Configurables.configure(channel, context); + + Assert.assertTrue(field("queue") + .ofType(LinkedBlockingDeque.class) + .in(channel).get() + .remainingCapacity() > 0); + + Assert.assertTrue(field("transCapacity") + .ofType(Integer.class) + .in(channel).get() > 0); + } }
