Updated Branches: refs/heads/trunk 0c73dc7e7 -> 89308fa8d
FLUME-1505. TestFileChannel needs to be able to force a checkpoint. (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/89308fa8 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/89308fa8 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/89308fa8 Branch: refs/heads/trunk Commit: 89308fa8da2525bcdbd81949c7286443db5f558f Parents: 0c73dc7 Author: Hari Shreedharan <[email protected]> Authored: Wed Aug 22 13:16:22 2012 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Wed Aug 22 13:16:22 2012 -0700 ---------------------------------------------------------------------- flume-ng-channels/flume-file-channel/pom.xml | 7 ++ .../java/org/apache/flume/channel/file/Log.java | 2 +- .../apache/flume/channel/file/TestFileChannel.java | 64 +++++---------- 3 files changed, 29 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/89308fa8/flume-ng-channels/flume-file-channel/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/pom.xml b/flume-ng-channels/flume-file-channel/pom.xml index cd882e5..62d80e3 100644 --- a/flume-ng-channels/flume-file-channel/pom.xml +++ b/flume-ng-channels/flume-file-channel/pom.xml @@ -81,6 +81,13 @@ </dependency> <dependency> + <groupId>org.easytesting</groupId> + <artifactId>fest-reflect</artifactId> + <version>1.4</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/flume/blob/89308fa8/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 9b13423..1e2706b 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -647,7 +647,7 @@ class Log { * @param force a flag to force the writing of checkpoint * @throws IOException if we are unable to write the checkpoint out to disk */ - private boolean writeCheckpoint(boolean force) throws Exception { + private Boolean writeCheckpoint(Boolean force) throws Exception { boolean checkpointCompleted = false; boolean lockAcquired = tryLockExclusive(); if(!lockAcquired) { http://git-wip-us.apache.org/repos/asf/flume/blob/89308fa8/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index 3dededf..bca2b17 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -17,7 +17,8 @@ * under the License. */ package org.apache.flume.channel.file; - +import static org.fest.reflect.core.Reflection.field; +import static org.fest.reflect.core.Reflection.method; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -64,7 +65,6 @@ import com.google.common.io.Files; import com.google.common.io.Resources; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; public class TestFileChannel { @@ -144,12 +144,7 @@ public class TestFileChannel { channel.take(); } }).get(); - long lastTake = System.currentTimeMillis(); - File inflightsFile = new File(checkpointDir, "inflighttakes"); - - while (inflightsFile.lastModified() < lastTake) { - Thread.sleep(500); - } + forceCheckpoint(channel); channel.stop(); //Simulate a sink, so separate thread. try { @@ -240,11 +235,7 @@ public class TestFileChannel { } } }); - long lastPut = System.currentTimeMillis(); - File checkpoint = new File(checkpointDir, "checkpoint"); - while (checkpoint.lastModified() < lastPut) { - Thread.sleep(500); - } + forceCheckpoint(channel); tx.commit(); tx.close(); latch.countDown(); @@ -810,17 +801,13 @@ public class TestFileChannel { Transaction tx = channel.getTransaction(); tx.begin(); Event e = channel.take(); - long takeTime = System.currentTimeMillis(); Assert.assertNotNull(e); String s = new String(e.getBody(), Charsets.UTF_8); out.add(s); LOG.info("Slow take got " + s); // sleep so a checkpoint occurs. take is before // and commit is after the checkpoint - File checkpoint = new File(checkpointDir, "checkpoint"); - while(checkpoint.lastModified() < takeTime){ - TimeUnit.MILLISECONDS.sleep(500); - } + forceCheckpoint(channel); tx.commit(); tx.close(); channel.stop(); @@ -868,14 +855,7 @@ public class TestFileChannel { tx.begin(); channel.put(EventBuilder.withBody(new byte[]{'c','d'})); set.add(new String(new byte[]{'c', 'd'})); - File checkpoint = new File(checkpointDir, "checkpoint"); - long t1 = System.currentTimeMillis(); - while(checkpoint.lastModified() < t1) { - TimeUnit.MILLISECONDS.sleep(500); - if (System.currentTimeMillis() - checkpoint.lastModified() > 15000) { - throw new TimeoutException("Checkpoint did not happen"); - } - } + forceCheckpoint(channel); tx.commit(); tx.close(); channel.stop(); @@ -916,25 +896,10 @@ public class TestFileChannel { tx.begin(); channel.put(EventBuilder.withBody(new byte[]{'c', 'd'})); set.add(new String(new byte[]{'c','d'})); - File checkpoint = new File(checkpointDir, "checkpoint"); - long t1 = System.currentTimeMillis(); - while (checkpoint.lastModified() < t1) { - TimeUnit.MILLISECONDS.sleep(500); - if(System.currentTimeMillis() - checkpoint.lastModified() > 15000){ - throw new TimeoutException("Checkpoint was expected," - + " but did not happen"); - } - } + forceCheckpoint(channel); tx.commit(); tx.close(); - long t2 = System.currentTimeMillis(); - while(checkpoint.lastModified() < t2){ - TimeUnit.MILLISECONDS.sleep(500); - if (t2 - checkpoint.lastModified() > 15000) { - throw new TimeoutException("Checkpoint was expected, " - + "but did not happen"); - } - } + forceCheckpoint(channel); channel.stop(); channel = createFileChannel(overrides); @@ -953,6 +918,19 @@ public class TestFileChannel { channel.stop(); } + private static void forceCheckpoint(FileChannel channel) { + Log log = field("log") + .ofType(Log.class) + .in(channel) + .get(); + + Assert.assertTrue("writeCheckpoint returned false", + method("writeCheckpoint") + .withReturnType(Boolean.class) + .withParameterTypes(Boolean.class) + .in(log) + .invoke(true)); + } private static void copyDecompressed(String resource, File output) throws IOException { URL input = Resources.getResource(resource);
