Repository: flume
Updated Branches:
  refs/heads/trunk dfa062757 -> ed433ae1b


FLUME-3085. HDFS Sink can skip flushing some BucketWriters, might lead to data 
loss

This commit fixes the issue when in HDFSEventSink.process() a 
BucketWriter.append()
call threw a BucketClosedException then the newly created BucketWriter wasn't
flushed after the processing loop.

This closes #129

Reviewers: Attila Simon, Mike Percy

(Denes Arvay via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ed433ae1
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ed433ae1
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ed433ae1

Branch: refs/heads/trunk
Commit: ed433ae1b12d40117ca3dca2ffd57389984ede72
Parents: dfa0627
Author: Denes Arvay <[email protected]>
Authored: Thu Apr 20 15:58:47 2017 +0200
Committer: Mike Percy <[email protected]>
Committed: Mon May 8 14:02:36 2017 -0700

----------------------------------------------------------------------
 .../apache/flume/sink/hdfs/HDFSEventSink.java   | 19 ++---
 .../flume/sink/hdfs/TestHDFSEventSink.java      | 80 ++++++++++++++++++++
 2 files changed, 90 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ed433ae1/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 741f01e..40f2f4a 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TimeZone;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
@@ -40,7 +42,6 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.SystemClock;
 import org.apache.flume.Transaction;
 import org.apache.flume.auth.FlumeAuthenticationUtil;
-import org.apache.flume.auth.FlumeAuthenticator;
 import org.apache.flume.auth.PrivilegedExecutor;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.BucketPath;
@@ -55,7 +56,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class HDFSEventSink extends AbstractSink implements Configurable {
@@ -354,9 +354,9 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
   public Status process() throws EventDeliveryException {
     Channel channel = getChannel();
     Transaction transaction = channel.getTransaction();
-    List<BucketWriter> writers = Lists.newArrayList();
     transaction.begin();
     try {
+      Set<BucketWriter> writers = new LinkedHashSet<>();
       int txnEventCount = 0;
       for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
         Event event = channel.take();
@@ -396,11 +396,6 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
           }
         }
 
-        // track the buckets getting written in this transaction
-        if (!writers.contains(bucketWriter)) {
-          writers.add(bucketWriter);
-        }
-
         // Write the data to HDFS
         try {
           bucketWriter.append(event);
@@ -415,6 +410,11 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
           }
           bucketWriter.append(event);
         }
+
+        // track the buckets getting written in this transaction
+        if (!writers.contains(bucketWriter)) {
+          writers.add(bucketWriter);
+        }
       }
 
       if (txnEventCount == 0) {
@@ -455,7 +455,8 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
     }
   }
 
-  private BucketWriter initializeBucketWriter(String realPath,
+  @VisibleForTesting
+  BucketWriter initializeBucketWriter(String realPath,
       String realName, String lookupPath, HDFSWriter hdfsWriter,
       WriterCallback closeCallback) {
     BucketWriter bucketWriter = new BucketWriter(rollInterval,

http://git-wip-us.apache.org/repos/asf/flume/blob/ed433ae1/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index 782cf47..bbc0ba8 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -27,11 +27,16 @@ import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Maps;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData;
@@ -58,7 +63,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -67,6 +74,9 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1545,4 +1555,74 @@ public class TestHDFSEventSink {
     Assert.assertEquals(6, totalRenameAttempts);
 
   }
+
+  /**
+   * BucketWriter.append() can throw a BucketClosedException when called from
+   * HDFSEventSink.process() due to a race condition between 
HDFSEventSink.process() and the
+   * BucketWriter's close threads.
+   * This test case tests whether if this happens the newly created 
BucketWriter will be flushed.
+   * For more details see FLUME-3085
+   */
+  @Test
+  public void testFlushedIfAppendFailedWithBucketClosedException() throws 
Exception {
+    final Set<BucketWriter> bucketWriters = new HashSet<>();
+    sink = new HDFSEventSink() {
+      @Override
+      BucketWriter initializeBucketWriter(String realPath, String realName, 
String lookupPath,
+                                          HDFSWriter hdfsWriter, 
WriterCallback closeCallback) {
+        BucketWriter bw = Mockito.spy(super.initializeBucketWriter(realPath, 
realName, lookupPath,
+            hdfsWriter, closeCallback));
+        try {
+          // create mock BucketWriters where the first append() succeeds but 
the
+          // the second call throws a BucketClosedException
+          Mockito.doCallRealMethod()
+              .doThrow(BucketClosedException.class)
+              .when(bw).append(Mockito.any(Event.class));
+        } catch (IOException | InterruptedException e) {
+          Assert.fail("This shouldn't happen, as append() is called during 
mocking.");
+        }
+        bucketWriters.add(bw);
+        return bw;
+      }
+    };
+
+    Context context = new Context(ImmutableMap.of("hdfs.path", testPath));
+    Configurables.configure(sink, context);
+
+    Channel channel = Mockito.spy(new MemoryChannel());
+    Configurables.configure(channel, new Context());
+
+    final Iterator<Event> events = Iterators.forArray(
+        EventBuilder.withBody("test1".getBytes()), 
EventBuilder.withBody("test2".getBytes()));
+    Mockito.doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        return events.hasNext() ? events.next() : null;
+      }
+    }).when(channel).take();
+
+    sink.setChannel(channel);
+    sink.start();
+
+    sink.process();
+
+    // channel.take() should have called 3 times (2 events + 1 null)
+    Mockito.verify(channel, Mockito.times(3)).take();
+
+    FileSystem fs = FileSystem.get(new Configuration());
+    int fileCount = 0;
+    for (RemoteIterator<LocatedFileStatus> i = fs.listFiles(new 
Path(testPath), false);
+         i.hasNext(); i.next()) {
+      fileCount++;
+    }
+    Assert.assertEquals(2, fileCount);
+
+    Assert.assertEquals(2, bucketWriters.size());
+    // It is expected that flush() method was called exactly once for every 
BucketWriter
+    for (BucketWriter bw : bucketWriters) {
+      Mockito.verify(bw, Mockito.times(1)).flush();
+    }
+
+    sink.stop();
+  }
 }

Reply via email to