Repository: flume
Updated Branches:
  refs/heads/trunk 73d874440 -> e5c3e6aa7


FLUME-3080. Close failure in HDFS Sink might cause data loss

If the HDFS Sink tries to close a file but it fails (e.g. due to timeout) the 
last block might
not end up in COMPLETE state. In this case block recovery should happen but as 
the lease is
still held by Flume the NameNode will start the recovery process only after the 
hard limit of
1 hour expires.

This change adds an explicit recoverLease() call in case of close failure.

This closes #127

Reviewers: Hari Shreedharan

(Denes Arvay via Bessenyei Balázs Donát)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e5c3e6aa
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e5c3e6aa
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e5c3e6aa

Branch: refs/heads/trunk
Commit: e5c3e6aa76cf2b0bb0838ff6dcd3853656bff704
Parents: 73d8744
Author: Denes Arvay <[email protected]>
Authored: Mon Apr 3 17:27:19 2017 +0200
Committer: Bessenyei Balázs Donát <[email protected]>
Committed: Sun Apr 9 10:28:10 2017 +0000

----------------------------------------------------------------------
 .../apache/flume/sink/hdfs/BucketWriter.java    |  22 +++-
 .../hdfs/TestHDFSEventSinkOnMiniCluster.java    | 122 ++++++++++++++++++-
 2 files changed, 141 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e5c3e6aa/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index f6b1734..62e5383 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -30,6 +30,7 @@ import 
org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.slf4j.Logger;
@@ -359,6 +360,22 @@ class BucketWriter {
   }
 
   /**
+   * Tries to start the lease recovery process for the current bucketPath
+   * if the fileSystem is DistributedFileSystem.
+   * Catches and logs the IOException.
+   */
+  private synchronized void recoverLease() {
+    if (bucketPath != null && fileSystem instanceof DistributedFileSystem) {
+      try {
+        LOG.debug("Starting lease recovery for {}", bucketPath);
+        ((DistributedFileSystem) fileSystem).recoverLease(new 
Path(bucketPath));
+      } catch (IOException ex) {
+        LOG.warn("Lease recovery failed for {}", bucketPath, ex);
+      }
+    }
+  }
+
+  /**
    * Close the file handle and rename the temp file to the permanent filename.
    * Safe to call multiple times. Logs HDFSWriter.close() exceptions.
    * @throws IOException On failure to rename if temp file exists.
@@ -372,7 +389,7 @@ class BucketWriter {
     } catch (IOException e) {
       LOG.warn("pre-close flush failed", e);
     }
-    boolean failedToClose = false;
+
     LOG.info("Closing {}", bucketPath);
     CallRunner<Void> closeCallRunner = createCloseCallRunner();
     if (isOpen) {
@@ -383,7 +400,8 @@ class BucketWriter {
         LOG.warn("failed to close() HDFSWriter for file (" + bucketPath +
                  "). Exception follows.", e);
         sinkCounter.incrementConnectionFailedCount();
-        failedToClose = true;
+        // starting lease recovery process, see FLUME-3080
+        recoverLease();
       }
       isOpen = false;
     } else {

http://git-wip-us.apache.org/repos/asf/flume/blob/e5c3e6aa/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
index 7c1caaa..d35bce5 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
@@ -23,10 +23,15 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
+
+import com.google.common.base.Throwables;
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.Context;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.event.EventBuilder;
 import org.apache.hadoop.conf.Configuration;
@@ -35,7 +40,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -471,6 +477,120 @@ public class TestHDFSEventSinkOnMiniCluster {
     cluster = null;
   }
 
+  /**
+   * Tests if the lease gets released if the close() call throws IOException.
+   * For more details see https://issues.apache.org/jira/browse/FLUME-3080
+   */
+  @Test
+  public void testLeaseRecoveredIfCloseThrowsIOException() throws Exception {
+    testLeaseRecoveredIfCloseFails(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        throw new IOException();
+      }
+    });
+  }
+
+  /**
+   * Tests if the lease gets released if the close() call times out.
+   * For more details see https://issues.apache.org/jira/browse/FLUME-3080
+   */
+  @Test
+  public void testLeaseRecoveredIfCloseTimesOut() throws Exception {
+    testLeaseRecoveredIfCloseFails(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        TimeUnit.SECONDS.sleep(30);
+        return null;
+      }
+    });
+  }
+
+  private void testLeaseRecoveredIfCloseFails(final Callable<?> doThisInClose)
+      throws Exception {
+    cluster = new MiniDFSCluster.Builder(new 
Configuration()).numDataNodes(1).format(true).build();
+    cluster.waitActive();
+
+    String outputDir = "/flume/leaseRecovery";
+    Path outputDirPath = new Path(outputDir);
+
+    logger.info("Running test with output dir: {}", outputDir);
+
+    FileSystem fs = cluster.getFileSystem();
+    // ensure output directory is empty
+    if (fs.exists(outputDirPath)) {
+      fs.delete(outputDirPath, true);
+    }
+    String nnURL = getNameNodeURL(cluster);
+
+    Context ctx = new Context();
+    MemoryChannel channel = new MemoryChannel();
+    channel.configure(ctx);
+    channel.start();
+
+    ctx.put("hdfs.path", nnURL + outputDir);
+    ctx.put("hdfs.fileType", HDFSWriterFactory.DataStreamType);
+    ctx.put("hdfs.batchSize", Integer.toString(1));
+    ctx.put("hdfs.callTimeout", Integer.toString(1000));
+
+    HDFSWriter hdfsWriter = new HDFSDataStream() {
+      @Override
+      public void close() throws IOException {
+        try {
+          doThisInClose.call();
+        } catch (Throwable e) {
+          Throwables.propagateIfPossible(e, IOException.class);
+          throw new RuntimeException(e);
+        }
+      }
+    };
+    hdfsWriter.configure(ctx);
+
+    HDFSEventSink sink = new HDFSEventSink();
+    sink.configure(ctx);
+    sink.setMockFs(fs);
+    sink.setMockWriter(hdfsWriter);
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    try {
+      channel.put(EventBuilder.withBody("test", Charsets.UTF_8));
+      txn.commit();
+    } finally {
+      txn.close();
+    }
+
+    sink.process();
+    sink.stop();
+    channel.stop();
+
+    FileStatus[] statuses = fs.listStatus(outputDirPath);
+    Assert.assertEquals(1, statuses.length);
+
+    String filePath = statuses[0].getPath().toUri().getPath();
+    LeaseManager lm = NameNodeAdapter.getLeaseManager(cluster.getNamesystem());
+
+    Object lease = lm.getLeaseByPath(filePath);
+    // wait until the NameNode recovers the lease
+    for (int i = 0; i < 10 && lease != null; i++) {
+      TimeUnit.SECONDS.sleep(1);
+      lease = lm.getLeaseByPath(filePath);
+    }
+
+    // There should be no lease for the given path even if close failed as the 
BucketWriter
+    // explicitly calls the recoverLease()
+    Assert.assertNull(lease);
+
+    if (!KEEP_DATA) {
+      fs.delete(outputDirPath, true);
+    }
+
+    cluster.shutdown();
+    cluster = null;
+  }
+
   @AfterClass
   public static void teardownClass() {
     // restore system state, if needed

Reply via email to