Repository: hadoop
Updated Branches:
  refs/heads/branch-3.1 1f0952bfc -> 25f18619f


HDFS-13164. File not closed if streamer fail with DSQuotaExceededException.

(cherry picked from commit 51088d323359587dca7831f74c9d065c2fccc60d)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/25f18619
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/25f18619
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/25f18619

Branch: refs/heads/branch-3.1
Commit: 25f18619f8b9da0c25ab9d05237a2eaf88a3ef19
Parents: 1f0952b
Author: Xiao Chen <x...@apache.org>
Authored: Fri Feb 23 13:47:39 2018 -0800
Committer: Xiao Chen <x...@apache.org>
Committed: Fri Feb 23 14:00:17 2018 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  63 +++++++++--
 .../hadoop/hdfs/client/impl/LeaseRenewer.java   |   2 +-
 .../java/org/apache/hadoop/hdfs/TestQuota.java  | 107 +++++++++++++++++++
 3 files changed, 163 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/25f18619/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 7849796..9734752 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -852,7 +852,19 @@ public class DFSOutputStream extends FSOutputSummer
 
   protected synchronized void closeImpl() throws IOException {
     if (isClosed()) {
-      getStreamer().getLastException().check(true);
+      LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
+          closed, getStreamer().streamerClosed());
+      try {
+        getStreamer().getLastException().check(true);
+      } catch (IOException ioe) {
+        cleanupAndRethrowIOException(ioe);
+      } finally {
+        if (!closed) {
+          // If stream is not closed but streamer closed, clean up the stream.
+          // Most importantly, end the file lease.
+          closeThreads(true);
+        }
+      }
       return;
     }
 
@@ -867,14 +879,12 @@ public class DFSOutputStream extends FSOutputSummer
         setCurrentPacketToEmpty();
       }
 
-      flushInternal();             // flush all data to Datanodes
-      // get last block before destroying the streamer
-      ExtendedBlock lastBlock = getStreamer().getBlock();
-
-      try (TraceScope ignored =
-               dfsClient.getTracer().newScope("completeFile")) {
-        completeFile(lastBlock);
+      try {
+        flushInternal();             // flush all data to Datanodes
+      } catch (IOException ioe) {
+        cleanupAndRethrowIOException(ioe);
       }
+      completeFile();
     } catch (ClosedChannelException ignored) {
     } finally {
       // Failures may happen when flushing data.
@@ -886,6 +896,43 @@ public class DFSOutputStream extends FSOutputSummer
     }
   }
 
+  private void completeFile() throws IOException {
+    // get last block before destroying the streamer
+    ExtendedBlock lastBlock = getStreamer().getBlock();
+    try (TraceScope ignored =
+        dfsClient.getTracer().newScope("completeFile")) {
+      completeFile(lastBlock);
+    }
+  }
+
+  /**
+   * Determines whether an IOException thrown needs extra cleanup on the 
stream.
+   * Space quota exceptions will be thrown when getting new blocks, so the
+   * open HDFS file need to be closed.
+   *
+   * @param ioe the IOException
+   * @return whether the stream needs cleanup for the given IOException
+   */
+  private boolean exceptionNeedsCleanup(IOException ioe) {
+    return ioe instanceof DSQuotaExceededException
+        || ioe instanceof QuotaByStorageTypeExceededException;
+  }
+
+  private void cleanupAndRethrowIOException(IOException ioe)
+      throws IOException {
+    if (exceptionNeedsCleanup(ioe)) {
+      final MultipleIOException.Builder b = new MultipleIOException.Builder();
+      b.add(ioe);
+      try {
+        completeFile();
+      } catch (IOException e) {
+        b.add(e);
+        throw b.build();
+      }
+    }
+    throw ioe;
+  }
+
   // should be called holding (this) lock since setTestFilename() may
   // be called during unit tests
   protected void completeFile(ExtendedBlock last) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/25f18619/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
index e33d024..957c0a9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
@@ -74,7 +74,7 @@ import org.slf4j.LoggerFactory;
  */
 @InterfaceAudience.Private
 public class LeaseRenewer {
-  static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
+  public static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
 
   private static long leaseRenewerGraceDefault = 60*1000L;
   static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/25f18619/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
index 88889f3..2924a74 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
@@ -35,6 +35,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.Scanner;
 
+import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -42,6 +43,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.QuotaUsage;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
@@ -58,13 +60,20 @@ import org.apache.hadoop.util.ToolRunner;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+import org.slf4j.LoggerFactory;
 
 /** A class for testing quota-related commands */
 public class TestQuota {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestQuota.class);
   
   private static Configuration conf = null;
   private static final ByteArrayOutputStream OUT_STREAM = new 
ByteArrayOutputStream();
@@ -77,6 +86,9 @@ public class TestQuota {
   /* set a smaller block size so that we can test with smaller space quotas */
   private static final int DEFAULT_BLOCK_SIZE = 512;
 
+  @Rule
+  public final Timeout testTestout = new Timeout(120000);
+
   @BeforeClass
   public static void setUpClass() throws Exception {
     conf = new HdfsConfiguration();
@@ -1479,6 +1491,101 @@ public class TestQuota {
         "clrSpaceQuota");
   }
 
+  @Test
+  public void testSpaceQuotaExceptionOnClose() throws Exception {
+    GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
+    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
+    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
+    final Path dir = new Path(PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    assertTrue(dfs.mkdirs(dir));
+    final String[] args = new String[] {"-setSpaceQuota", "1", dir.toString()};
+    assertEquals(0, ToolRunner.run(dfsAdmin, args));
+
+    final Path testFile = new Path(dir, "file");
+    final FSDataOutputStream stream = dfs.create(testFile);
+    stream.write("whatever".getBytes());
+    try {
+      stream.close();
+      fail("close should fail");
+    } catch (DSQuotaExceededException expected) {
+    }
+
+    assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
+  }
+
+  @Test
+  public void testSpaceQuotaExceptionOnFlush() throws Exception {
+    GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
+    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.TRACE);
+    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
+    final Path dir = new Path(PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    assertTrue(dfs.mkdirs(dir));
+    final String[] args = new String[] {"-setSpaceQuota", "1", dir.toString()};
+    assertEquals(0, ToolRunner.run(dfsAdmin, args));
+
+    Path testFile = new Path(dir, "file");
+    FSDataOutputStream stream = dfs.create(testFile);
+    // get the lease renewer now so we can verify it later without calling
+    // getLeaseRenewer, which will automatically add the client into it.
+    final LeaseRenewer leaseRenewer = dfs.getClient().getLeaseRenewer();
+    stream.write("whatever".getBytes());
+    try {
+      stream.hflush();
+      fail("flush should fail");
+    } catch (DSQuotaExceededException expected) {
+    }
+    // even if we close the stream in finially, it won't help.
+    try {
+      stream.close();
+      fail("close should fail too");
+    } catch (DSQuotaExceededException expected) {
+    }
+
+    GenericTestUtils.setLogLevel(LeaseRenewer.LOG, Level.TRACE);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        LOG.info("LeaseRenewer: {}", leaseRenewer);
+        return leaseRenewer.isEmpty();
+      }
+    }, 100, 10000);
+    assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
+  }
+
+  @Test
+  public void testSpaceQuotaExceptionOnAppend() throws Exception {
+    GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
+    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
+    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
+    final Path dir = new Path(PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    dfs.delete(dir, true);
+    assertTrue(dfs.mkdirs(dir));
+    final String[] args =
+        new String[] {"-setSpaceQuota", "4000", dir.toString()};
+    ToolRunner.run(dfsAdmin, args);
+
+    final Path testFile = new Path(dir, "file");
+    OutputStream stream = dfs.create(testFile);
+    stream.write("whatever".getBytes());
+    stream.close();
+
+    assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
+
+    stream = dfs.append(testFile);
+    byte[] buf = AppendTestUtil.initBuffer(4096);
+    stream.write(buf);
+    try {
+      stream.close();
+      fail("close after append should fail");
+    } catch (DSQuotaExceededException expected) {
+    }
+    assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
+  }
+
   private void testSetAndClearSpaceQuotaNoAccessInternal(
       final String[] args,
       final int cmdRet,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to