Author: suresh Date: Fri Oct 11 00:05:43 2013 New Revision: 1531154 URL: http://svn.apache.org/r1531154 Log: HDFS-5335. Merge 1531153 from branch-2
Added: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java - copied unchanged from r1531153, hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1531154&r1=1531153&r2=1531154&view=diff ============================================================================== --- hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Oct 11 00:05:43 2013 @@ -30,6 +30,9 @@ Release 2.2.1 - UNRELEASED HDFS-5337. should do hsync for a commit request even there is no pending writes (brandonli) + HDFS-5335. Hive query failed with possible race in dfs output stream. + (Haohui Mai via suresh) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1531154&r1=1531153&r2=1531154&view=diff ============================================================================== --- hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original) +++ hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Fri Oct 11 00:05:43 2013 @@ -38,6 +38,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CanSetDropBehind; @@ -85,7 +86,6 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; -import org.mortbay.log.Log; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; @@ -141,7 +141,7 @@ public class DFSOutputStream extends FSO private long bytesCurBlock = 0; // bytes writen in current block private int packetSize = 0; // write packet size, not including the header. private int chunksPerPacket = 0; - private volatile IOException lastException = null; + private final AtomicReference<IOException> lastException = new AtomicReference<IOException>(); private long artificialSlowdown = 0; private long lastFlushOffset = 0; // offset when flush was invoked //persist blocks on namenode @@ -814,8 +814,8 @@ public class DFSOutputStream extends FSO if (++pipelineRecoveryCount > 5) { DFSClient.LOG.warn("Error recovering pipeline for writing " + block + ". Already retried 5 times for the same packet."); - lastException = new IOException("Failing write. Tried pipeline " + - "recovery 5 times without success."); + lastException.set(new IOException("Failing write. Tried pipeline " + + "recovery 5 times without success.")); streamerClosed = true; return false; } @@ -1005,8 +1005,8 @@ public class DFSOutputStream extends FSO } } if (nodes.length <= 1) { - lastException = new IOException("All datanodes " + pipelineMsg - + " are bad. Aborting..."); + lastException.set(new IOException("All datanodes " + pipelineMsg + + " are bad. Aborting...")); streamerClosed = true; return false; } @@ -1021,7 +1021,7 @@ public class DFSOutputStream extends FSO newnodes.length-errorIndex); nodes = newnodes; hasError = false; - lastException = null; + lastException.set(null); errorIndex = -1; } @@ -1065,7 +1065,7 @@ public class DFSOutputStream extends FSO ExtendedBlock oldBlock = block; do { hasError = false; - lastException = null; + lastException.set(null); errorIndex = -1; success = false; @@ -1279,9 +1279,7 @@ public class DFSOutputStream extends FSO } private void setLastException(IOException e) { - if (lastException == null) { - lastException = e; - } + lastException.compareAndSet(null, e); } } @@ -1313,7 +1311,7 @@ public class DFSOutputStream extends FSO protected void checkClosed() throws IOException { if (closed) { - IOException e = lastException; + IOException e = lastException.get(); throw e != null ? e : new ClosedChannelException(); } } @@ -1469,6 +1467,7 @@ public class DFSOutputStream extends FSO private void waitAndQueueCurrentPacket() throws IOException { synchronized (dataQueue) { + try { // If queue is full, then wait till we have enough space while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) { try { @@ -1487,6 +1486,8 @@ public class DFSOutputStream extends FSO } checkClosed(); queueCurrentPacket(); + } catch (ClosedChannelException e) { + } } } @@ -1735,7 +1736,7 @@ public class DFSOutputStream extends FSO DFSClient.LOG.warn("Error while syncing", e); synchronized (this) { if (!closed) { - lastException = new IOException("IOException flush:" + e); + lastException.set(new IOException("IOException flush:" + e)); closeThreads(true); } } @@ -1793,21 +1794,25 @@ public class DFSOutputStream extends FSO if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Waiting for ack for: " + seqno); } - synchronized (dataQueue) { - while (!closed) { - checkClosed(); - if (lastAckedSeqno >= seqno) { - break; - } - try { - dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue - } catch (InterruptedException ie) { - throw new InterruptedIOException( - "Interrupted while waiting for data to be acknowledged by pipeline"); + try { + synchronized (dataQueue) { + while (!closed) { + checkClosed(); + if (lastAckedSeqno >= seqno) { + break; + } + try { + dataQueue.wait(1000); // when we receive an ack, we notify on + // dataQueue + } catch (InterruptedException ie) { + throw new InterruptedIOException( + "Interrupted while waiting for data to be acknowledged by pipeline"); + } } } + checkClosed(); + } catch (ClosedChannelException e) { } - checkClosed(); } private synchronized void start() { @@ -1853,7 +1858,7 @@ public class DFSOutputStream extends FSO @Override public synchronized void close() throws IOException { if (closed) { - IOException e = lastException; + IOException e = lastException.getAndSet(null); if (e == null) return; else @@ -1880,6 +1885,7 @@ public class DFSOutputStream extends FSO closeThreads(false); completeFile(lastBlock); dfsClient.endFileLease(src); + } catch (ClosedChannelException e) { } finally { closed = true; }