Repository: flume Updated Branches: refs/heads/trunk dfa062757 -> ed433ae1b
FLUME-3085. HDFS Sink can skip flushing some BucketWriters, might lead to data loss This commit fixes the issue when in HDFSEventSink.process() a BucketWriter.append() call threw a BucketClosedException then the newly created BucketWriter wasn't flushed after the processing loop. This closes #129 Reviewers: Attila Simon, Mike Percy (Denes Arvay via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ed433ae1 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ed433ae1 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ed433ae1 Branch: refs/heads/trunk Commit: ed433ae1b12d40117ca3dca2ffd57389984ede72 Parents: dfa0627 Author: Denes Arvay <[email protected]> Authored: Thu Apr 20 15:58:47 2017 +0200 Committer: Mike Percy <[email protected]> Committed: Mon May 8 14:02:36 2017 -0700 ---------------------------------------------------------------------- .../apache/flume/sink/hdfs/HDFSEventSink.java | 19 ++--- .../flume/sink/hdfs/TestHDFSEventSink.java | 80 ++++++++++++++++++++ 2 files changed, 90 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ed433ae1/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index 741f01e..40f2f4a 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -22,8 +22,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Calendar; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TimeZone; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; @@ -40,7 +42,6 @@ import org.apache.flume.EventDeliveryException; import org.apache.flume.SystemClock; import org.apache.flume.Transaction; import org.apache.flume.auth.FlumeAuthenticationUtil; -import org.apache.flume.auth.FlumeAuthenticator; import org.apache.flume.auth.PrivilegedExecutor; import org.apache.flume.conf.Configurable; import org.apache.flume.formatter.output.BucketPath; @@ -55,7 +56,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class HDFSEventSink extends AbstractSink implements Configurable { @@ -354,9 +354,9 @@ public class HDFSEventSink extends AbstractSink implements Configurable { public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); - List<BucketWriter> writers = Lists.newArrayList(); transaction.begin(); try { + Set<BucketWriter> writers = new LinkedHashSet<>(); int txnEventCount = 0; for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) { Event event = channel.take(); @@ -396,11 +396,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } } - // track the buckets getting written in this transaction - if (!writers.contains(bucketWriter)) { - writers.add(bucketWriter); - } - // Write the data to HDFS try { bucketWriter.append(event); @@ -415,6 +410,11 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } bucketWriter.append(event); } + + // track the buckets getting written in this transaction + if (!writers.contains(bucketWriter)) { + writers.add(bucketWriter); + } } if (txnEventCount == 0) { @@ -455,7 +455,8 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } } - private BucketWriter initializeBucketWriter(String realPath, + @VisibleForTesting + BucketWriter initializeBucketWriter(String realPath, String realName, String lookupPath, HDFSWriter hdfsWriter, WriterCallback closeCallback) { BucketWriter bucketWriter = new BucketWriter(rollInterval, http://git-wip-us.apache.org/repos/asf/flume/blob/ed433ae1/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index 782cf47..bbc0ba8 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -27,11 +27,16 @@ import java.util.Arrays; import java.util.Calendar; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; import com.google.common.collect.Maps; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; @@ -58,7 +63,9 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; @@ -67,6 +74,9 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1545,4 +1555,74 @@ public class TestHDFSEventSink { Assert.assertEquals(6, totalRenameAttempts); } + + /** + * BucketWriter.append() can throw a BucketClosedException when called from + * HDFSEventSink.process() due to a race condition between HDFSEventSink.process() and the + * BucketWriter's close threads. + * This test case tests whether if this happens the newly created BucketWriter will be flushed. + * For more details see FLUME-3085 + */ + @Test + public void testFlushedIfAppendFailedWithBucketClosedException() throws Exception { + final Set<BucketWriter> bucketWriters = new HashSet<>(); + sink = new HDFSEventSink() { + @Override + BucketWriter initializeBucketWriter(String realPath, String realName, String lookupPath, + HDFSWriter hdfsWriter, WriterCallback closeCallback) { + BucketWriter bw = Mockito.spy(super.initializeBucketWriter(realPath, realName, lookupPath, + hdfsWriter, closeCallback)); + try { + // create mock BucketWriters where the first append() succeeds but the + // the second call throws a BucketClosedException + Mockito.doCallRealMethod() + .doThrow(BucketClosedException.class) + .when(bw).append(Mockito.any(Event.class)); + } catch (IOException | InterruptedException e) { + Assert.fail("This shouldn't happen, as append() is called during mocking."); + } + bucketWriters.add(bw); + return bw; + } + }; + + Context context = new Context(ImmutableMap.of("hdfs.path", testPath)); + Configurables.configure(sink, context); + + Channel channel = Mockito.spy(new MemoryChannel()); + Configurables.configure(channel, new Context()); + + final Iterator<Event> events = Iterators.forArray( + EventBuilder.withBody("test1".getBytes()), EventBuilder.withBody("test2".getBytes())); + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return events.hasNext() ? events.next() : null; + } + }).when(channel).take(); + + sink.setChannel(channel); + sink.start(); + + sink.process(); + + // channel.take() should have called 3 times (2 events + 1 null) + Mockito.verify(channel, Mockito.times(3)).take(); + + FileSystem fs = FileSystem.get(new Configuration()); + int fileCount = 0; + for (RemoteIterator<LocatedFileStatus> i = fs.listFiles(new Path(testPath), false); + i.hasNext(); i.next()) { + fileCount++; + } + Assert.assertEquals(2, fileCount); + + Assert.assertEquals(2, bucketWriters.size()); + // It is expected that flush() method was called exactly once for every BucketWriter + for (BucketWriter bw : bucketWriters) { + Mockito.verify(bw, Mockito.times(1)).flush(); + } + + sink.stop(); + } }
