Repository: hadoop Updated Branches: refs/heads/branch-2.8 8763d07f9 -> 23a658c4e
Revert "HDFS-13164. File not closed if streamer fail with DSQuotaExceededException." This reverts commit 8763d07f97c4667566badabc2ec2e2cd9ae92c0e. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/23a658c4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/23a658c4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/23a658c4 Branch: refs/heads/branch-2.8 Commit: 23a658c4e7e1cb486bf5a83ddd1fb4272e0450c8 Parents: 8763d07 Author: Jason Lowe <[email protected]> Authored: Mon Feb 26 08:59:11 2018 -0600 Committer: Jason Lowe <[email protected]> Committed: Mon Feb 26 08:59:11 2018 -0600 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSOutputStream.java | 63 ++--------- .../hadoop/hdfs/client/impl/LeaseRenewer.java | 2 +- .../java/org/apache/hadoop/hdfs/TestQuota.java | 109 +------------------ 3 files changed, 10 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/23a658c4/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index b3245a5..09d3143 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -815,19 +815,7 @@ public class DFSOutputStream extends FSOutputSummer protected synchronized void closeImpl() throws IOException { if (isClosed()) { - LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]", - closed, getStreamer().streamerClosed()); - try { - getStreamer().getLastException().check(true); - } catch (IOException ioe) { - cleanupAndRethrowIOException(ioe); - } finally { - if (!closed) { - // If stream is not closed but streamer closed, clean up the stream. - // Most importantly, end the file lease. - closeThreads(true); - } - } + getStreamer().getLastException().check(true); return; } @@ -842,12 +830,14 @@ public class DFSOutputStream extends FSOutputSummer setCurrentPacketToEmpty(); } - try { - flushInternal(); // flush all data to Datanodes - } catch (IOException ioe) { - cleanupAndRethrowIOException(ioe); + flushInternal(); // flush all data to Datanodes + // get last block before destroying the streamer + ExtendedBlock lastBlock = getStreamer().getBlock(); + + try (TraceScope ignored = + dfsClient.getTracer().newScope("completeFile")) { + completeFile(lastBlock); } - completeFile(); } catch (ClosedChannelException ignored) { } finally { // Failures may happen when flushing data. @@ -859,43 +849,6 @@ public class DFSOutputStream extends FSOutputSummer } } - private void completeFile() throws IOException { - // get last block before destroying the streamer - ExtendedBlock lastBlock = getStreamer().getBlock(); - try (TraceScope ignored = - dfsClient.getTracer().newScope("completeFile")) { - completeFile(lastBlock); - } - } - - /** - * Determines whether an IOException thrown needs extra cleanup on the stream. - * Space quota exceptions will be thrown when getting new blocks, so the - * open HDFS file need to be closed. - * - * @param ioe the IOException - * @return whether the stream needs cleanup for the given IOException - */ - private boolean exceptionNeedsCleanup(IOException ioe) { - return ioe instanceof DSQuotaExceededException - || ioe instanceof QuotaByStorageTypeExceededException; - } - - private void cleanupAndRethrowIOException(IOException ioe) - throws IOException { - if (exceptionNeedsCleanup(ioe)) { - final MultipleIOException.Builder b = new MultipleIOException.Builder(); - b.add(ioe); - try { - completeFile(); - } catch (IOException e) { - b.add(e); - throw b.build(); - } - } - throw ioe; - } - // should be called holding (this) lock since setTestFilename() may // be called during unit tests protected void completeFile(ExtendedBlock last) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/23a658c4/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java index 957c0a9..e33d024 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java @@ -74,7 +74,7 @@ import org.slf4j.LoggerFactory; */ @InterfaceAudience.Private public class LeaseRenewer { - public static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class); + static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class); private static long leaseRenewerGraceDefault = 60*1000L; static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L; http://git-wip-us.apache.org/repos/asf/hadoop/blob/23a658c4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java index 48092c7..3ad1811 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java @@ -35,7 +35,6 @@ import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Scanner; -import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FSDataOutputStream; @@ -44,7 +43,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.QuotaUsage; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; @@ -60,21 +58,14 @@ import org.apache.hadoop.util.ToolRunner; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; import com.google.common.base.Charsets; import com.google.common.collect.Lists; -import org.junit.rules.Timeout; -import org.slf4j.Logger; -import org.slf4j.event.Level; -import org.slf4j.LoggerFactory; /** A class for testing quota-related commands */ public class TestQuota { - - private static final Logger LOG = LoggerFactory.getLogger(TestQuota.class); - + private static Configuration conf = null; private static final ByteArrayOutputStream OUT_STREAM = new ByteArrayOutputStream(); private static final ByteArrayOutputStream ERR_STREAM = new ByteArrayOutputStream(); @@ -86,9 +77,6 @@ public class TestQuota { /* set a smaller block size so that we can test with smaller space quotas */ private static final int DEFAULT_BLOCK_SIZE = 512; - @Rule - public final Timeout testTestout = new Timeout(120000); - @BeforeClass public static void setUpClass() throws Exception { conf = new HdfsConfiguration(); @@ -1474,101 +1462,6 @@ public class TestQuota { "clrSpaceQuota"); } - @Test - public void testSpaceQuotaExceptionOnClose() throws Exception { - GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE); - GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE); - final DFSAdmin dfsAdmin = new DFSAdmin(conf); - final Path dir = new Path(PathUtils.getTestPath(getClass()), - GenericTestUtils.getMethodName()); - assertTrue(dfs.mkdirs(dir)); - final String[] args = new String[] {"-setSpaceQuota", "1", dir.toString()}; - assertEquals(0, ToolRunner.run(dfsAdmin, args)); - - final Path testFile = new Path(dir, "file"); - final FSDataOutputStream stream = dfs.create(testFile); - stream.write("whatever".getBytes()); - try { - stream.close(); - fail("close should fail"); - } catch (DSQuotaExceededException expected) { - } - - assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction()); - } - - @Test - public void testSpaceQuotaExceptionOnFlush() throws Exception { - GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE); - GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE); - GenericTestUtils.setLogLevel(DFSClient.LOG, Level.TRACE); - final DFSAdmin dfsAdmin = new DFSAdmin(conf); - final Path dir = new Path(PathUtils.getTestPath(getClass()), - GenericTestUtils.getMethodName()); - assertTrue(dfs.mkdirs(dir)); - final String[] args = new String[] {"-setSpaceQuota", "1", dir.toString()}; - assertEquals(0, ToolRunner.run(dfsAdmin, args)); - - Path testFile = new Path(dir, "file"); - FSDataOutputStream stream = dfs.create(testFile); - // get the lease renewer now so we can verify it later without calling - // getLeaseRenewer, which will automatically add the client into it. - final LeaseRenewer leaseRenewer = dfs.getClient().getLeaseRenewer(); - stream.write("whatever".getBytes()); - try { - stream.hflush(); - fail("flush should fail"); - } catch (DSQuotaExceededException expected) { - } - // even if we close the stream in finially, it won't help. - try { - stream.close(); - fail("close should fail too"); - } catch (DSQuotaExceededException expected) { - } - - GenericTestUtils.setLogLevel(LeaseRenewer.LOG, Level.TRACE); - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - LOG.info("LeaseRenewer: {}", leaseRenewer); - return leaseRenewer.isEmpty(); - } - }, 100, 10000); - assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction()); - } - - @Test - public void testSpaceQuotaExceptionOnAppend() throws Exception { - GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE); - GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE); - final DFSAdmin dfsAdmin = new DFSAdmin(conf); - final Path dir = new Path(PathUtils.getTestPath(getClass()), - GenericTestUtils.getMethodName()); - dfs.delete(dir, true); - assertTrue(dfs.mkdirs(dir)); - final String[] args = - new String[] {"-setSpaceQuota", "4000", dir.toString()}; - ToolRunner.run(dfsAdmin, args); - - final Path testFile = new Path(dir, "file"); - OutputStream stream = dfs.create(testFile); - stream.write("whatever".getBytes()); - stream.close(); - - assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction()); - - stream = dfs.append(testFile); - byte[] buf = AppendTestUtil.initBuffer(4096); - stream.write(buf); - try { - stream.close(); - fail("close after append should fail"); - } catch (DSQuotaExceededException expected) { - } - assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction()); - } - private void testSetAndClearSpaceQuotaNoAccessInternal( final String[] args, final int cmdRet, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
