Updated Branches:
  refs/heads/trunk f269c69bc -> f26c79f6d

FLUME-1543. TestFileChannel should be factored into many tests.

(Brock Noland via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f26c79f6
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f26c79f6
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f26c79f6

Branch: refs/heads/trunk
Commit: f26c79f6d32b7893e0d60326e6992faa8ead0785
Parents: f269c69
Author: Mike Percy <[email protected]>
Authored: Sat Sep 8 09:10:17 2012 -0700
Committer: Mike Percy <[email protected]>
Committed: Sat Sep 8 09:10:17 2012 -0700

----------------------------------------------------------------------
 .../apache/flume/channel/file/TestFileChannel.java |  296 +--------------
 .../flume/channel/file/TestFileChannelBase.java    |   82 ++++
 .../file/TestFileChannelFormatRegression.java      |  113 ++++++
 .../flume/channel/file/TestFileChannelRestart.java |  153 ++++++++
 .../channel/file/TestFileChannelRollback.java      |  139 +++++++
 5 files changed, 490 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/f26c79f6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index e2abc27..7c1aaab 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -20,12 +20,9 @@ package org.apache.flume.channel.file;
 
 import static org.apache.flume.channel.file.TestUtils.*;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -38,14 +35,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.flume.ChannelException;
-import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.sink.LoggerSink;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -53,56 +46,22 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.io.Files;
 
-public class TestFileChannel {
+public class TestFileChannel extends TestFileChannelBase {
 
   private static final Logger LOG = LoggerFactory
           .getLogger(TestFileChannel.class);
-  private FileChannel channel;
-  private File baseDir;
-  private File checkpointDir;
-  private File[] dataDirs;
-  private String dataDir;
 
   @Before
   public void setup() {
-    baseDir = Files.createTempDir();
-    checkpointDir = new File(baseDir, "chkpt");
-    Assert.assertTrue(checkpointDir.mkdirs() || checkpointDir.isDirectory());
-    dataDirs = new File[3];
-    dataDir = "";
-    for (int i = 0; i < dataDirs.length; i++) {
-      dataDirs[i] = new File(baseDir, "data" + (i+1));
-      Assert.assertTrue(dataDirs[i].mkdirs() || dataDirs[i].isDirectory());
-      dataDir += dataDirs[i].getAbsolutePath() + ",";
-    }
-    dataDir = dataDir.substring(0, dataDir.length() - 1);
-    channel = createFileChannel();
+    super.setup();
   }
   @After
   public void teardown() {
-    if(channel != null && channel.isOpen()) {
-      channel.stop();
-    }
-    FileUtils.deleteQuietly(baseDir);
-  }
-  private Context createContext() {
-    return createContext(new HashMap<String, String>());
-  }
-  private Context createContext(Map<String, String> overrides) {
-    return TestUtils.createFileChannelContext(checkpointDir.getAbsolutePath(),
-        dataDir, overrides);
-  }
-  private FileChannel createFileChannel() {
-    return createFileChannel(new HashMap<String, String>());
-  }
-  private FileChannel createFileChannel(Map<String, String> overrides) {
-    return TestUtils.createFileChannel(checkpointDir.getAbsolutePath(), 
dataDir, overrides);
+    super.teardown();
   }
   @Test
   public void testFailAfterTakeBeforeCommit() throws Throwable {
@@ -203,108 +162,6 @@ public class TestFileChannel {
   }
 
   @Test
-  public void testRestartLogReplayV1() throws Exception {
-    doTestRestart(true, false, false, false);
-  }
-  @Test
-  public void testRestartLogReplayV2() throws Exception {
-    doTestRestart(false, false, false, false);
-  }
-
-  @Test
-  public void testFastReplayV1() throws Exception {
-    doTestRestart(true, true, true, true);
-  }
-
-  @Test
-  public void testFastReplayV2() throws Exception {
-    doTestRestart(false, true, true, true);
-  }
-
-  @Test
-  public void testNormalReplayV1() throws Exception {
-    doTestRestart(true, true, true, false);
-  }
-
-  @Test
-  public void testNormalReplayV2() throws Exception {
-    doTestRestart(false, true, true, false);
-  }
-
-  public void doTestRestart(boolean useLogReplayV1,
-          boolean forceCheckpoint, boolean deleteCheckpoint,
-          boolean useFastReplay) throws Exception {
-    Map<String, String> overrides = Maps.newHashMap();
-    overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1,
-            String.valueOf(useLogReplayV1));
-    overrides.put(
-            FileChannelConfiguration.USE_FAST_REPLAY,
-            String.valueOf(useFastReplay));
-    channel = createFileChannel(overrides);
-    channel.start();
-    Assert.assertTrue(channel.isOpen());
-    Set<String> in = Sets.newHashSet();
-    try {
-      while(true) {
-        in.addAll(putEvents(channel, "restart", 1, 1));
-      }
-    } catch (ChannelException e) {
-      Assert.assertEquals("Cannot acquire capacity. [channel="
-          +channel.getName()+"]", e.getMessage());
-    }
-    if (forceCheckpoint) {
-      forceCheckpoint(channel);
-    }
-    channel.stop();
-    if(deleteCheckpoint) {
-      File checkpoint = new File(checkpointDir, "checkpoint");
-      Assert.assertTrue(checkpoint.delete());
-      File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
-      Assert.assertTrue(checkpointMetaData.delete());
-    }
-    channel = createFileChannel(overrides);
-    channel.start();
-    Assert.assertTrue(channel.isOpen());
-    Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
-    compareInputAndOut(in, out);
-  }
-  @Test
-  public void testRestartFailsWhenMetaDataExistsButCheckpointDoesNot()
-      throws Exception {
-    Map<String, String> overrides = Maps.newHashMap();
-    channel = createFileChannel(overrides);
-    channel.start();
-    Assert.assertTrue(channel.isOpen());
-    Assert.assertEquals(1,  putEvents(channel, "restart", 1, 1).size());
-    forceCheckpoint(channel);
-    channel.stop();
-    File checkpoint = new File(checkpointDir, "checkpoint");
-    Assert.assertTrue(checkpoint.delete());
-    File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
-    Assert.assertTrue(checkpointMetaData.exists());
-    channel = createFileChannel(overrides);
-    channel.start();
-    Assert.assertFalse(channel.isOpen());
-  }
-  @Test
-  public void testRestartFailsWhenCheckpointExistsButMetaDoesNot()
-      throws Exception {
-    Map<String, String> overrides = Maps.newHashMap();
-    channel = createFileChannel(overrides);
-    channel.start();
-    Assert.assertTrue(channel.isOpen());
-    Assert.assertEquals(1,  putEvents(channel, "restart", 1, 1).size());
-    forceCheckpoint(channel);
-    channel.stop();
-    File checkpoint = new File(checkpointDir, "checkpoint");
-    File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
-    Assert.assertTrue(checkpointMetaData.delete());
-    Assert.assertTrue(checkpoint.exists());
-    channel = createFileChannel(overrides);
-    channel.start();
-    Assert.assertFalse(channel.isOpen());
-  }
-  @Test
   public void testReconfigure() throws Exception {
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -335,27 +192,6 @@ public class TestFileChannel {
     compareInputAndOut(expected, actual);
   }
   @Test
-  public void testRollbackAfterNoPutTake() throws Exception {
-    channel.start();
-    Assert.assertTrue(channel.isOpen());
-    Transaction transaction;
-    transaction = channel.getTransaction();
-    transaction.begin();
-    transaction.rollback();
-    transaction.close();
-
-    // ensure we can reopen log with no error
-    channel.stop();
-    channel = createFileChannel();
-    channel.start();
-    Assert.assertTrue(channel.isOpen());
-    transaction = channel.getTransaction();
-    transaction.begin();
-    Assert.assertNull(channel.take());
-    transaction.commit();
-    transaction.close();
-  }
-  @Test
   public void testCommitAfterNoPutTake() throws Exception {
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -466,72 +302,6 @@ public class TestFileChannel {
     Assert.assertTrue(channel.isOpen());
   }
   @Test
-  public void testRollbackSimulatedCrash() throws Exception {
-    channel.start();
-    Assert.assertTrue(channel.isOpen());
-    int numEvents = 50;
-    Set<String> in = putEvents(channel, "rollback", 1, numEvents);
-
-    Transaction transaction;
-    // put an item we will rollback
-    transaction = channel.getTransaction();
-    transaction.begin();
-    channel.put(EventBuilder.withBody("rolled back".getBytes(Charsets.UTF_8)));
-    transaction.rollback();
-    transaction.close();
-
-    // simulate crash
-    channel.stop();
-    channel = createFileChannel();
-    channel.start();
-    Assert.assertTrue(channel.isOpen());
-
-    // we should not get the rolled back item
-    Set<String> out = takeEvents(channel, 1, numEvents);
-    compareInputAndOut(in, out);
-  }
-  @Test
-  public void testRollbackSimulatedCrashWithSink() throws Exception {
-    channel.start();
-    Assert.assertTrue(channel.isOpen());
-    int numEvents = 100;
-
-    LoggerSink sink = new LoggerSink();
-    sink.setChannel(channel);
-    // sink will leave one item
-    CountingSinkRunner runner = new CountingSinkRunner(sink, numEvents - 1);
-    runner.start();
-    putEvents(channel, "rollback", 10, numEvents);
-
-    Transaction transaction;
-    // put an item we will rollback
-    transaction = channel.getTransaction();
-    transaction.begin();
-    byte[] bytes = "rolled back".getBytes(Charsets.UTF_8);
-    channel.put(EventBuilder.withBody(bytes));
-    transaction.rollback();
-    transaction.close();
-
-    while(runner.isAlive()) {
-      Thread.sleep(10L);
-    }
-    Assert.assertEquals(numEvents - 1, runner.getCount());
-    for(Exception ex : runner.getErrors()) {
-      LOG.warn("Sink had error", ex);
-    }
-    Assert.assertEquals(Collections.EMPTY_LIST, runner.getErrors());
-
-    // simulate crash
-    channel.stop();
-    channel = createFileChannel();
-    channel.start();
-    Assert.assertTrue(channel.isOpen());
-    Set<String> out = takeEvents(channel, 1, 1);
-    Assert.assertEquals(1, out.size());
-    String s = out.iterator().next();
-    Assert.assertTrue(s, s.startsWith("rollback-90-9"));
-  }
-  @Test
   public void testThreaded() throws IOException, InterruptedException {
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -612,67 +382,7 @@ public class TestFileChannel {
     fileChannel.start();
     Assert.assertTrue(!fileChannel.isOpen());
   }
-  /**
-   * This is regression test with files generated by a file channel
-   * with the FLUME-1432 patch.
-   */
-  @Test
-  public void testFileFormatV2postFLUME1432()
-          throws Exception {
-    TestUtils.copyDecompressed("fileformat-v2-checkpoint.gz",
-            new File(checkpointDir, "checkpoint"));
-    for (int i = 0; i < dataDirs.length; i++) {
-      int fileIndex = i + 1;
-      TestUtils.copyDecompressed("fileformat-v2-log-"+fileIndex+".gz",
-              new File(dataDirs[i], "log-" + fileIndex));
-    }
-    Map<String, String> overrides = Maps.newHashMap();
-    overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10));
-    channel = createFileChannel(overrides);
-    channel.start();
-    Assert.assertTrue(channel.isOpen());
-    Set<String> events = takeEvents(channel, 1);
-    Set<String> expected = new HashSet<String>();
-    expected.addAll(Arrays.asList(
-            (new String[]{
-              "2684", "2685", "2686", "2687", "2688", "2689", "2690", "2691"
-            })));
-    compareInputAndOut(expected, events);
 
-  }
-  /**
-   * This is a regression test with files generated by a file channel
-   * without the FLUME-1432 patch.
-   */
-  @Test
-  public void testFileFormatV2PreFLUME1432LogReplayV1()
-          throws Exception {
-    doTestFileFormatV2PreFLUME1432(true);
-  }
-  @Test
-  public void testFileFormatV2PreFLUME1432LogReplayV2()
-          throws Exception {
-    doTestFileFormatV2PreFLUME1432(false);
-  }
-  public void doTestFileFormatV2PreFLUME1432(boolean useLogReplayV1)
-          throws Exception {
-    TestUtils.copyDecompressed("fileformat-v2-pre-FLUME-1432-checkpoint.gz",
-            new File(checkpointDir, "checkpoint"));
-    for (int i = 0; i < dataDirs.length; i++) {
-      int fileIndex = i + 1;
-      TestUtils.copyDecompressed("fileformat-v2-pre-FLUME-1432-log-" + 
fileIndex
-          + ".gz", new File(dataDirs[i], "log-" + fileIndex));
-    }
-    Map<String, String> overrides = Maps.newHashMap();
-    overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
-    overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1,
-            String.valueOf(useLogReplayV1));
-    channel = createFileChannel(overrides);
-    channel.start();
-    Assert.assertTrue(channel.isOpen());
-    Set<String> events = takeEvents(channel, 1);
-    Assert.assertEquals(50, events.size());
-  }
 
   /**
    * Test contributed by Brock Noland during code review.

http://git-wip-us.apache.org/repos/asf/flume/blob/f26c79f6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
new file mode 100644
index 0000000..4655978
--- /dev/null
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.Context;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import com.google.common.io.Files;
+
+public class TestFileChannelBase {
+
+  protected FileChannel channel;
+  protected File baseDir;
+  protected File checkpointDir;
+  protected File[] dataDirs;
+  protected String dataDir;
+
+  @Before
+  public void setup() {
+    baseDir = Files.createTempDir();
+    checkpointDir = new File(baseDir, "chkpt");
+    Assert.assertTrue(checkpointDir.mkdirs() || checkpointDir.isDirectory());
+    dataDirs = new File[3];
+    dataDir = "";
+    for (int i = 0; i < dataDirs.length; i++) {
+      dataDirs[i] = new File(baseDir, "data" + (i + 1));
+      Assert.assertTrue(dataDirs[i].mkdirs() || dataDirs[i].isDirectory());
+      dataDir += dataDirs[i].getAbsolutePath() + ",";
+    }
+    dataDir = dataDir.substring(0, dataDir.length() - 1);
+    channel = createFileChannel();
+  }
+
+  @After
+  public void teardown() {
+    if (channel != null && channel.isOpen()) {
+      channel.stop();
+    }
+    FileUtils.deleteQuietly(baseDir);
+  }
+
+  protected Context createContext() {
+    return createContext(new HashMap<String, String>());
+  }
+
+  protected Context createContext(Map<String, String> overrides) {
+    return TestUtils.createFileChannelContext(checkpointDir.getAbsolutePath(),
+        dataDir, overrides);
+  }
+
+  protected FileChannel createFileChannel() {
+    return createFileChannel(new HashMap<String, String>());
+  }
+
+  protected FileChannel createFileChannel(Map<String, String> overrides) {
+    return TestUtils.createFileChannel(checkpointDir.getAbsolutePath(),
+        dataDir, overrides);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f26c79f6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java
new file mode 100644
index 0000000..8fc0faf
--- /dev/null
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import static org.apache.flume.channel.file.TestUtils.*;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+
+public class TestFileChannelFormatRegression extends TestFileChannelBase {
+  protected static final Logger LOG = LoggerFactory
+      .getLogger(TestFileChannelFormatRegression.class);
+
+  @Before
+  public void setup() {
+    super.setup();
+  }
+
+  @After
+  public void teardown() {
+    super.teardown();
+  }
+  /**
+   * This is regression test with files generated by a file channel
+   * with the FLUME-1432 patch.
+   */
+  @Test
+  public void testFileFormatV2postFLUME1432()
+          throws Exception {
+    TestUtils.copyDecompressed("fileformat-v2-checkpoint.gz",
+            new File(checkpointDir, "checkpoint"));
+    for (int i = 0; i < dataDirs.length; i++) {
+      int fileIndex = i + 1;
+      TestUtils.copyDecompressed("fileformat-v2-log-"+fileIndex+".gz",
+              new File(dataDirs[i], "log-" + fileIndex));
+    }
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10));
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> events = takeEvents(channel, 1);
+    Set<String> expected = new HashSet<String>();
+    expected.addAll(Arrays.asList(
+            (new String[]{
+              "2684", "2685", "2686", "2687", "2688", "2689", "2690", "2691"
+            })));
+    compareInputAndOut(expected, events);
+
+  }
+  /**
+   * This is a regression test with files generated by a file channel
+   * without the FLUME-1432 patch.
+   */
+  @Test
+  public void testFileFormatV2PreFLUME1432LogReplayV1()
+          throws Exception {
+    doTestFileFormatV2PreFLUME1432(true);
+  }
+  @Test
+  public void testFileFormatV2PreFLUME1432LogReplayV2()
+          throws Exception {
+    doTestFileFormatV2PreFLUME1432(false);
+  }
+  public void doTestFileFormatV2PreFLUME1432(boolean useLogReplayV1)
+          throws Exception {
+    TestUtils.copyDecompressed("fileformat-v2-pre-FLUME-1432-checkpoint.gz",
+            new File(checkpointDir, "checkpoint"));
+    for (int i = 0; i < dataDirs.length; i++) {
+      int fileIndex = i + 1;
+      TestUtils.copyDecompressed("fileformat-v2-pre-FLUME-1432-log-" + 
fileIndex
+          + ".gz", new File(dataDirs[i], "log-" + fileIndex));
+    }
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
+    overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1,
+            String.valueOf(useLogReplayV1));
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> events = takeEvents(channel, 1);
+    Assert.assertEquals(50, events.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f26c79f6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
new file mode 100644
index 0000000..cf61aed
--- /dev/null
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import static org.apache.flume.channel.file.TestUtils.*;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flume.ChannelException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class TestFileChannelRestart extends TestFileChannelBase {
+  protected static final Logger LOG = LoggerFactory
+      .getLogger(TestFileChannelRestart.class);
+
+  @Before
+  public void setup() {
+    super.setup();
+  }
+
+  @After
+  public void teardown() {
+    super.teardown();
+  }
+  @Test
+  public void testRestartLogReplayV1() throws Exception {
+    doTestRestart(true, false, false, false);
+  }
+  @Test
+  public void testRestartLogReplayV2() throws Exception {
+    doTestRestart(false, false, false, false);
+  }
+
+  @Test
+  public void testFastReplayV1() throws Exception {
+    doTestRestart(true, true, true, true);
+  }
+
+  @Test
+  public void testFastReplayV2() throws Exception {
+    doTestRestart(false, true, true, true);
+  }
+
+  @Test
+  public void testNormalReplayV1() throws Exception {
+    doTestRestart(true, true, true, false);
+  }
+
+  @Test
+  public void testNormalReplayV2() throws Exception {
+    doTestRestart(false, true, true, false);
+  }
+
+  public void doTestRestart(boolean useLogReplayV1,
+          boolean forceCheckpoint, boolean deleteCheckpoint,
+          boolean useFastReplay) throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1,
+            String.valueOf(useLogReplayV1));
+    overrides.put(
+            FileChannelConfiguration.USE_FAST_REPLAY,
+            String.valueOf(useFastReplay));
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = Sets.newHashSet();
+    try {
+      while(true) {
+        in.addAll(putEvents(channel, "restart", 1, 1));
+      }
+    } catch (ChannelException e) {
+      Assert.assertEquals("Cannot acquire capacity. [channel="
+          +channel.getName()+"]", e.getMessage());
+    }
+    if (forceCheckpoint) {
+      forceCheckpoint(channel);
+    }
+    channel.stop();
+    if(deleteCheckpoint) {
+      File checkpoint = new File(checkpointDir, "checkpoint");
+      Assert.assertTrue(checkpoint.delete());
+      File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
+      Assert.assertTrue(checkpointMetaData.delete());
+    }
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
+    compareInputAndOut(in, out);
+  }
+  @Test
+  public void testRestartFailsWhenMetaDataExistsButCheckpointDoesNot()
+      throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Assert.assertEquals(1,  putEvents(channel, "restart", 1, 1).size());
+    forceCheckpoint(channel);
+    channel.stop();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    Assert.assertTrue(checkpoint.delete());
+    File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
+    Assert.assertTrue(checkpointMetaData.exists());
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertFalse(channel.isOpen());
+  }
+  @Test
+  public void testRestartFailsWhenCheckpointExistsButMetaDoesNot()
+      throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Assert.assertEquals(1,  putEvents(channel, "restart", 1, 1).size());
+    forceCheckpoint(channel);
+    channel.stop();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
+    Assert.assertTrue(checkpointMetaData.delete());
+    Assert.assertTrue(checkpoint.exists());
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertFalse(channel.isOpen());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f26c79f6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java
new file mode 100644
index 0000000..5b1a088
--- /dev/null
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import static org.apache.flume.channel.file.TestUtils.*;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.flume.Transaction;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.sink.LoggerSink;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+
+
+public class TestFileChannelRollback extends TestFileChannelBase {
+  protected static final Logger LOG = LoggerFactory
+      .getLogger(TestFileChannelRollback.class);
+
+  @Before
+  public void setup() {
+    super.setup();
+  }
+
+  @After
+  public void teardown() {
+    super.teardown();
+  }
+  @Test
+  public void testRollbackAfterNoPutTake() throws Exception {
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Transaction transaction;
+    transaction = channel.getTransaction();
+    transaction.begin();
+    transaction.rollback();
+    transaction.close();
+
+    // ensure we can reopen log with no error
+    channel.stop();
+    channel = createFileChannel();
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    transaction = channel.getTransaction();
+    transaction.begin();
+    Assert.assertNull(channel.take());
+    transaction.commit();
+    transaction.close();
+  }
+  @Test
+  public void testRollbackSimulatedCrash() throws Exception {
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    int numEvents = 50;
+    Set<String> in = putEvents(channel, "rollback", 1, numEvents);
+
+    Transaction transaction;
+    // put an item we will rollback
+    transaction = channel.getTransaction();
+    transaction.begin();
+    channel.put(EventBuilder.withBody("rolled back".getBytes(Charsets.UTF_8)));
+    transaction.rollback();
+    transaction.close();
+
+    // simulate crash
+    channel.stop();
+    channel = createFileChannel();
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+
+    // we should not get the rolled back item
+    Set<String> out = takeEvents(channel, 1, numEvents);
+    compareInputAndOut(in, out);
+  }
+  @Test
+  public void testRollbackSimulatedCrashWithSink() throws Exception {
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    int numEvents = 100;
+
+    LoggerSink sink = new LoggerSink();
+    sink.setChannel(channel);
+    // sink will leave one item
+    CountingSinkRunner runner = new CountingSinkRunner(sink, numEvents - 1);
+    runner.start();
+    putEvents(channel, "rollback", 10, numEvents);
+
+    Transaction transaction;
+    // put an item we will rollback
+    transaction = channel.getTransaction();
+    transaction.begin();
+    byte[] bytes = "rolled back".getBytes(Charsets.UTF_8);
+    channel.put(EventBuilder.withBody(bytes));
+    transaction.rollback();
+    transaction.close();
+
+    while(runner.isAlive()) {
+      Thread.sleep(10L);
+    }
+    Assert.assertEquals(numEvents - 1, runner.getCount());
+    for(Exception ex : runner.getErrors()) {
+      LOG.warn("Sink had error", ex);
+    }
+    Assert.assertEquals(Collections.EMPTY_LIST, runner.getErrors());
+
+    // simulate crash
+    channel.stop();
+    channel = createFileChannel();
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = takeEvents(channel, 1, 1);
+    Assert.assertEquals(1, out.size());
+    String s = out.iterator().next();
+    Assert.assertTrue(s, s.startsWith("rollback-90-9"));
+  }
+}

Reply via email to