Updated Branches:
  refs/heads/trunk 52bd16ba9 -> ff37f5a54

FLUME-1560. FileChannel tests which fill up the channel should use larger batch 
size than 1

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ff37f5a5
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ff37f5a5
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ff37f5a5

Branch: refs/heads/trunk
Commit: ff37f5a541769908f4b518c369590fd8685cea22
Parents: 52bd16b
Author: Hari Shreedharan <[email protected]>
Authored: Mon Sep 10 19:31:14 2012 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Mon Sep 10 19:31:14 2012 -0700

----------------------------------------------------------------------
 .../apache/flume/channel/file/TestFileChannel.java |   34 +-------
 .../flume/channel/file/TestFileChannelRestart.java |   14 +---
 .../org/apache/flume/channel/file/TestUtils.java   |   60 ++++++++++++++-
 .../file/encryption/TestFileChannelEncryption.java |   44 ++---------
 4 files changed, 69 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ff37f5a5/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index 16157d5..8baf8fe 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -219,13 +219,7 @@ public class TestFileChannel extends TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    try {
-      putEvents(channel, "fillup", 1, Integer.MAX_VALUE);
-      Assert.fail();
-    } catch (ChannelException e) {
-      Assert.assertEquals("Cannot acquire capacity. 
[channel="+channel.getName()+"]",
-              e.getMessage());
-    }
+    fillChannel(channel, "fillup");
     // take an event, roll it back, and
     // then make sure a put fails
     Transaction transaction;
@@ -236,13 +230,7 @@ public class TestFileChannel extends TestFileChannelBase {
     transaction.rollback();
     transaction.close();
     // ensure the take the didn't change the state of the capacity
-    try {
-      putEvents(channel, "capacity", 1, 1);
-      Assert.fail();
-    } catch (ChannelException e) {
-      Assert.assertEquals("Cannot acquire capacity. 
[channel="+channel.getName()+"]",
-              e.getMessage());
-    }
+    Assert.assertEquals(0, fillChannel(channel, "capacity").size());
     // ensure we the events back
     Assert.assertEquals(5, takeEvents(channel, 1, 5).size());
   }
@@ -267,13 +255,7 @@ public class TestFileChannel extends TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    try {
-      putEvents(channel, "fillup", 1, Integer.MAX_VALUE);
-      Assert.fail();
-    } catch (ChannelException e) {
-      Assert.assertEquals("Cannot acquire capacity. 
[channel="+channel.getName()+"]",
-              e.getMessage());
-    }
+    fillChannel(channel, "fillup");
     // then do a put which will block but it will be assigned a tx id
     Future<String> put = Executors.newSingleThreadExecutor()
             .submit(new Callable<String>() {
@@ -395,15 +377,7 @@ public class TestFileChannel extends TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    Set<String> in = Sets.newHashSet();
-    try {
-      while (true) {
-        in.addAll(putEvents(channel, "restart", 1, 1));
-      }
-    } catch (ChannelException e) {
-      Assert.assertEquals("Cannot acquire capacity. [channel="
-              + channel.getName() + "]", e.getMessage());
-    }
+    Set<String> in = fillChannel(channel, "restart");
     Set<String> out = Sets.newHashSet();
     // now take one item off the channel
     Transaction tx = channel.getTransaction();

http://git-wip-us.apache.org/repos/asf/flume/blob/ff37f5a5/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index 90d5aed..4133573 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -24,7 +24,6 @@ import java.io.File;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.flume.ChannelException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -33,7 +32,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 public class TestFileChannelRestart extends TestFileChannelBase {
   protected static final Logger LOG = LoggerFactory
@@ -89,15 +87,7 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    Set<String> in = Sets.newHashSet();
-    try {
-      while(true) {
-        in.addAll(putEvents(channel, "restart", 1, 1));
-      }
-    } catch (ChannelException e) {
-      Assert.assertEquals("Cannot acquire capacity. [channel="
-          +channel.getName()+"]", e.getMessage());
-    }
+    Set<String> in = fillChannel(channel, "restart");
     if (forceCheckpoint) {
       forceCheckpoint(channel);
     }
@@ -111,7 +101,7 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
+    Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
   @Test

http://git-wip-us.apache.org/repos/asf/flume/blob/ff37f5a5/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
index 8a9f10f..2b88b96 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
@@ -35,6 +35,7 @@ import java.util.UUID;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
@@ -73,6 +74,7 @@ public class TestUtils {
   public static void compareInputAndOut(Set<String> in, Set<String> out) {
     Assert.assertNotNull(in);
     Assert.assertNotNull(out);
+    Assert.assertEquals(in.size(), out.size());
     Assert.assertTrue(in.equals(out));
   }
 
@@ -148,23 +150,74 @@ public class TestUtils {
     }
     return result;
   }
-
+  public static Set<String> consumeChannel(Channel channel)
+      throws Exception {
+    Set<String> result = Sets.newHashSet();
+    int[] batchSizes = new int[] {
+        1000, 100, 10, 1
+    };
+    for (int i = 0; i < batchSizes.length; i++) {
+      while(true) {
+        Set<String> batch = takeEvents(channel, batchSizes[i]);
+        if(batch.isEmpty()) {
+          break;
+        }
+        result.addAll(batch);
+      }
+    }
+    return result;
+  }
+  public static Set<String> fillChannel(Channel channel, String prefix)
+      throws Exception {
+    Set<String> result = Sets.newHashSet();
+    int[] batchSizes = new int[] {
+        1000, 100, 10, 1
+    };
+    for (int i = 0; i < batchSizes.length; i++) {
+      try {
+        while(true) {
+          Set<String> batch = putEvents(channel, prefix, batchSizes[i],
+              Integer.MAX_VALUE, true);
+          if(batch.isEmpty()) {
+            break;
+          }
+          result.addAll(batch);
+        }
+      } catch (ChannelException e) {
+        Assert.assertEquals("Cannot acquire capacity. [channel="
+            +channel.getName()+"]", e.getMessage());
+      }
+    }
+    return result;
+  }
   public static Set<String> putEvents(Channel channel, String prefix,
-          int batchSize, int numEvents) throws Exception {
+      int batchSize, int numEvents) throws Exception {
+    return putEvents(channel, prefix, batchSize, numEvents, false);
+  }
+  public static Set<String> putEvents(Channel channel, String prefix,
+          int batchSize, int numEvents, boolean untilCapacityIsReached)
+              throws Exception {
     Set<String> result = Sets.newHashSet();
     for (int i = 0; i < numEvents; i += batchSize) {
       Transaction transaction = channel.getTransaction();
       transaction.begin();
       try {
+        Set<String> batch = Sets.newHashSet();
         for (int j = 0; j < batchSize; j++) {
           String s = prefix + "-" + i + "-" + j + "-" + UUID.randomUUID();
           Event event = EventBuilder.withBody(s.getBytes(Charsets.UTF_8));
-          result.add(s);
           channel.put(event);
+          batch.add(s);
         }
         transaction.commit();
+        result.addAll(batch);
       } catch (Exception ex) {
         transaction.rollback();
+        if(untilCapacityIsReached && ex instanceof ChannelException &&
+            ("Cannot acquire capacity. [channel=" +channel.getName() + "]").
+              equals(ex.getMessage())) {
+          break;
+        }
         throw ex;
       } finally {
         transaction.close();
@@ -185,6 +238,7 @@ public class TestUtils {
     context.put(FileChannelConfiguration.CHECKPOINT_DIR,
             checkpointDir);
     context.put(FileChannelConfiguration.DATA_DIRS, dataDir);
+    context.put(FileChannelConfiguration.KEEP_ALIVE, String.valueOf(1));
     context.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
     // Set checkpoint for 5 seconds otherwise test will run out of memory
     context.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "5000");

http://git-wip-us.apache.org/repos/asf/flume/blob/ff37f5a5/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
index 493dac7..5f3a23d 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 
 public class TestFileChannelEncryption extends TestFileChannelBase {
@@ -90,21 +89,13 @@ public class TestFileChannelEncryption extends 
TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    Set<String> in = Sets.newHashSet();
-    try {
-      while(true) {
-        in.addAll(putEvents(channel, "restart", 1, 1));
-      }
-    } catch (ChannelException e) {
-      Assert.assertEquals("Cannot acquire capacity. [channel="
-          +channel.getName()+"]", e.getMessage());
-    }
+    Set<String> in = fillChannel(channel, "restart");
     channel.stop();
     channel = TestUtils.createFileChannel(checkpointDir.getAbsolutePath(),
         dataDir, overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
+    Set<String> out =  consumeChannel(channel);
     compareInputAndOut(in, out);
   }
   @Test
@@ -113,15 +104,7 @@ public class TestFileChannelEncryption extends 
TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    Set<String> in = Sets.newHashSet();
-    try {
-      while(true) {
-        in.addAll(putEvents(channel, "will-not-restart", 1, 1));
-      }
-    } catch (ChannelException e) {
-      Assert.assertEquals("Cannot acquire capacity. [channel="
-          +channel.getName()+"]", e.getMessage());
-    }
+    fillChannel(channel, "will-not-restart");
     channel.stop();
     Map<String, String> noEncryptionOverrides = getOverrides();
     channel = createFileChannel(noEncryptionOverrides);
@@ -142,15 +125,7 @@ public class TestFileChannelEncryption extends 
TestFileChannelBase {
     channel = createFileChannel(noEncryptionOverrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    Set<String> in = Sets.newHashSet();
-    try {
-      while(true) {
-        in.addAll(putEvents(channel, "unencrypted-and-encrypted", 1, 1));
-      }
-    } catch (ChannelException e) {
-      Assert.assertEquals("Cannot acquire capacity. [channel="
-          +channel.getName()+"]", e.getMessage());
-    }
+    Set<String> in = fillChannel(channel, "unencrypted-and-encrypted");
     int numEventsToRemove = in.size() / 2;
     for (int i = 0; i < numEventsToRemove; i++) {
       Assert.assertTrue(in.removeAll(takeEvents(channel, 1, 1)));
@@ -161,15 +136,8 @@ public class TestFileChannelEncryption extends 
TestFileChannelBase {
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    try {
-      while(true) {
-        in.addAll(putEvents(channel, "unencrypted-and-encrypted", 1, 1));
-      }
-    } catch (ChannelException e) {
-      Assert.assertEquals("Cannot acquire capacity. [channel="
-          +channel.getName()+"]", e.getMessage());
-    }
-    Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
+    in.addAll(fillChannel(channel, "unencrypted-and-encrypted"));
+    Set<String> out = consumeChannel(channel);
     compareInputAndOut(in, out);
   }
   @Test

Reply via email to