Updated Branches: refs/heads/flume-1.3.0 892521fd5 -> 4dea8489e
FLUME-1610: HDDSEventSink and bucket writer have a race condition (Mike Percy via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4dea8489 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4dea8489 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4dea8489 Branch: refs/heads/flume-1.3.0 Commit: 4dea8489e1264e633151ab7e558271bdffce9a69 Parents: 892521f Author: Brock Noland <[email protected]> Authored: Tue Sep 25 17:17:50 2012 -0500 Committer: Brock Noland <[email protected]> Committed: Tue Sep 25 17:18:10 2012 -0500 ---------------------------------------------------------------------- .../org/apache/flume/sink/hdfs/BucketWriter.java | 19 +++-- .../org/apache/flume/sink/hdfs/HDFSEventSink.java | 4 +- .../apache/flume/sink/hdfs/TestBucketWriter.java | 59 +++++++++++++++ 3 files changed, 71 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/4dea8489/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 6408eb9..bce8e11 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -236,6 +236,7 @@ class BucketWriter { * @throws IOException On failure to rename if temp file exists. */ public synchronized void close() throws IOException, InterruptedException { + flush(); runPrivileged(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { @@ -281,13 +282,15 @@ class BucketWriter { * flush the data */ public synchronized void flush() throws IOException, InterruptedException { - runPrivileged(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - doFlush(); - return null; - } - }); + if (!isBatchComplete()) { + runPrivileged(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + doFlush(); + return null; + } + }); + } } /** @@ -384,7 +387,7 @@ class BucketWriter { ", bucketPath = " + bucketPath + " ]"; } - public boolean isBatchComplete() { + private boolean isBatchComplete() { return (batchCounter == 0); } } http://git-wip-us.apache.org/repos/asf/flume/blob/4dea8489/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index 9a76ecb..5ec9eb8 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -426,9 +426,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { // flush all pending buckets before committing the transaction for (BucketWriter bucketWriter : writers) { - if (!bucketWriter.isBatchComplete()) { - flush(bucketWriter); - } + flush(bucketWriter); } transaction.commit(); http://git-wip-us.apache.org/repos/asf/flume/blob/4dea8489/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index bb12188..60f1830 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -27,7 +27,10 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SinkCounter; +import org.apache.flume.sink.FlumeFormatter; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CompressionCodec; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -36,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; +import java.io.File; public class TestBucketWriter { @@ -155,4 +159,59 @@ public class TestBucketWriter { Assert.assertEquals("files closed", 2, hdfsWriter.getFilesClosed()); } + @Test + public void testIntervalRollerBug() throws IOException, InterruptedException { + final int ROLL_INTERVAL = 1; // seconds + final int NUM_EVENTS = 10; + + HDFSWriter hdfsWriter = new HDFSWriter() { + private volatile boolean open = false; + @Override + public void configure(Context context) { + + } + @Override + public void sync() throws IOException { + if(!open) { + throw new IOException("closed"); + } + } + @Override + public void open(String filePath, CompressionCodec codec, + CompressionType cType, FlumeFormatter fmt) throws IOException { + open = true; + } + @Override + public void open(String filePath, FlumeFormatter fmt) throws IOException { + open = true; + } + @Override + public void close() throws IOException { + open = false; + } + @Override + public void append(Event e, FlumeFormatter fmt) throws IOException { + // we just re-open in append if closed + open = true; + } + }; + HDFSTextFormatter formatter = new HDFSTextFormatter(); + File tmpFile = File.createTempFile("flume", "test"); + tmpFile.deleteOnExit(); + BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, + tmpFile.getName(), null, SequenceFile.CompressionType.NONE, hdfsWriter, + formatter, timedRollerPool, null, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis())); + + Event e = EventBuilder.withBody("foo", Charsets.UTF_8); + for (int i = 0; i < NUM_EVENTS - 1; i++) { + bucketWriter.append(e); + } + + // sleep to force a roll... wait 2x interval just to be sure + Thread.sleep(2 * ROLL_INTERVAL * 1000L); + + bucketWriter.flush(); // throws closed exception + } + }
