Author: hairong
Date: Fri Jan 15 01:09:26 2010
New Revision: 899506
URL: http://svn.apache.org/viewvc?rev=899506&view=rev
Log:
Revert the change made by HDFS-101 from branch 0.20
Modified:
hadoop/common/branches/branch-0.20/CHANGES.txt
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=899506&r1=899505&r2=899506&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Fri Jan 15 01:09:26 2010
@@ -81,9 +81,6 @@
HADOOP-6428. HttpServer sleeps with negative values (cos)
- HDFS-101. DFS write pipeline: DFSClient sometimes does not detect second
- datanode failure. (hairong)
-
HADOOP-5623. Fixes a problem to do with status messages getting overwritten
in streaming jobs. (Rick Cox and Jothi Padmanabhan via tomwhite)
Modified:
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=899506&r1=899505&r2=899506&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
(original)
+++
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Fri Jan 15 01:09:26 2010
@@ -75,7 +75,6 @@
DatanodeInfo srcDataNode = null;
private Checksum partialCrc = null;
private DataNode datanode = null;
- volatile private boolean mirrorError;
BlockReceiver(Block block, DataInputStream in, String inAddr,
String myAddr, boolean isRecovery, String clientName,
@@ -174,19 +173,21 @@
/**
* While writing to mirrorOut, failure to write to mirror should not
- * affect this datanode.
+ * affect this datanode unless a client is writing the block.
*/
private void handleMirrorOutError(IOException ioe) throws IOException {
- LOG.info(datanode.dnRegistration + ": Exception writing block " +
+ LOG.info(datanode.dnRegistration + ":Exception writing block " +
block + " to mirror " + mirrorAddr + "\n" +
StringUtils.stringifyException(ioe));
- if (Thread.interrupted()) { // shut down if the thread is interrupted
+ mirrorOut = null;
+ //
+ // If stream-copy fails, continue
+ // writing to disk for replication requests. For client
+ // writes, return error so that the client can do error
+ // recovery.
+ //
+ if (clientName.length() > 0) {
throw ioe;
- } else { // encounter an error while writing to mirror
- // continue to run even if can not write to mirror
- // notify client of the error
- // and wait for the client to shut down the pipeline
- mirrorError = true;
}
}
@@ -395,8 +396,8 @@
setBlockPosition(offsetInBlock);
- // First write the packet to the mirror:
- if (mirrorOut != null && !mirrorError) {
+ //First write the packet to the mirror:
+ if (mirrorOut != null) {
try {
mirrorOut.write(buf.array(), buf.position(), buf.remaining());
mirrorOut.flush();
@@ -514,8 +515,7 @@
if (clientName.length() > 0) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(this, block, mirrIn,
- replyOut, numTargets,
- Thread.currentThread()));
+ replyOut, numTargets));
responder.start(); // start thread to processes reponses
}
@@ -700,21 +700,18 @@
DataOutputStream replyOut; // output to upstream datanode
private int numTargets; // number of downstream datanodes including
myself
private BlockReceiver receiver; // The owner of this responder.
- private Thread receiverThread; // the thread that spawns this responder
public String toString() {
return "PacketResponder " + numTargets + " for Block " + this.block;
}
PacketResponder(BlockReceiver receiver, Block b, DataInputStream in,
- DataOutputStream out, int numTargets,
- Thread receiverThread) {
+ DataOutputStream out, int numTargets) {
this.receiver = receiver;
this.block = b;
mirrorIn = in;
replyOut = out;
this.numTargets = numTargets;
- this.receiverThread = receiverThread;
}
/**
@@ -851,26 +848,24 @@
}
boolean lastPacketInBlock = false;
- boolean isInterrupted = false;
while (running && datanode.shouldRun && !lastPacketInBlock) {
try {
+ boolean didRead = false;
long expected = -2;
PipelineAck ack = new PipelineAck();
- long seqno = -2;
try {
- if (!mirrorError) {
- // read an ack from downstream datanode
- ack.readFields(mirrorIn);
- if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets + " got " + ack);
- }
- seqno = ack.getSeqno();
+ // read an ack from downstream datanode
+ ack.readFields(mirrorIn);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets + " got " + ack);
}
+ long seqno = ack.getSeqno();
+ didRead = true;
if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
ack.write(replyOut); // send keepalive
replyOut.flush();
- } else if (seqno >= 0 || mirrorError) {
+ } else if (seqno >= 0) {
Packet pkt = null;
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() ==
0) {
@@ -882,13 +877,10 @@
}
wait();
}
- if (!running || !datanode.shouldRun) {
- break;
- }
pkt = ackQueue.removeFirst();
expected = pkt.seqno;
notifyAll();
- if (seqno != expected && !mirrorError) {
+ if (seqno != expected) {
throw new IOException("PacketResponder " + numTargets +
" for block " + block +
" expected seqno:" + expected +
@@ -897,32 +889,27 @@
lastPacketInBlock = pkt.lastPacketInBlock;
}
}
- } catch (InterruptedException ine) {
- isInterrupted = true;
- } catch (IOException ioe) {
- if (Thread.interrupted()) {
- isInterrupted = true;
- } else {
- // continue to run even if can not read from mirror
- // notify client of the error
- // and wait for the client to shut down the pipeline
- mirrorError = true;
- LOG.info("PacketResponder " + block + " " + numTargets +
- " Exception " + StringUtils.stringifyException(ioe));
+ } catch (Throwable e) {
+ if (running) {
+ LOG.info("PacketResponder " + block + " " + numTargets +
+ " Exception " + StringUtils.stringifyException(e));
+ running = false;
}
}
- if (Thread.interrupted() || isInterrupted) {
+ if (Thread.interrupted()) {
/* The receiver thread cancelled this thread.
* We could also check any other status updates from the
* receiver thread (e.g. if it is ok to write to replyOut).
* It is prudent to not send any more status back to the client
* because this datanode has a problem. The upstream datanode
- * will detect that this datanode is bad, and rightly so.
+ * will detect a timout on heartbeats and will declare that
+ * this datanode is bad, and rightly so.
*/
LOG.info("PacketResponder " + block + " " + numTargets +
" : Thread is interrupted.");
- break;
+ running = false;
+ continue;
}
// If this is the last packet in block, then close block
@@ -949,7 +936,7 @@
// construct my ack message
short[] replies = null;
- if (mirrorError) { // no ack is read
+ if (!didRead) { // no ack is read
replies = new short[2];
replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
@@ -970,14 +957,24 @@
" for block " + block +
" responded an ack: " + replyAck);
}
- } catch (Throwable e) {
+
+ // If we forwarded an error response from a downstream datanode
+ // and we are acting on behalf of a client, then we quit. The
+ // client will drive the recovery mechanism.
+ if (!replyAck.isSuccess() && receiver.clientName.length() > 0) {
+ running = false;
+ }
+ } catch (IOException e) {
if (running) {
LOG.info("PacketResponder " + block + " " + numTargets +
" Exception " + StringUtils.stringifyException(e));
running = false;
}
- if (!Thread.interrupted()) { // error not caused by interruption
- receiverThread.interrupt();
+ } catch (RuntimeException e) {
+ if (running) {
+ LOG.info("PacketResponder " + block + " " + numTargets +
+ " Exception " + StringUtils.stringifyException(e));
+ running = false;
}
}
}