Author: hairong
Date: Mon Nov 1 06:17:40 2010
New Revision: 1029559
URL: http://svn.apache.org/viewvc?rev=1029559&view=rev
Log:
HDFS-724. Use a bidirectional heartbeat to detect stuck pipeline. Contributed
by Hairong Kuang.
Added:
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/TestStuckDataNode.java
Modified:
hadoop/common/branches/branch-0.20-append/CHANGES.txt
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=1029559&r1=1029558&r2=1029559&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Mon Nov 1 06:17:40
2010
@@ -82,6 +82,9 @@ Release 0.20-append - Unreleased
HDFS-1346. DFSClient receives out of order packet ack. (hairong)
+ HDFS-724. Use a bidirectional heartbeat to detect stuck
+ pipeline. (hairong)
+
Release 0.20.3 - Unreleased
NEW FEATURES
Modified:
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1029559&r1=1029558&r2=1029559&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
(original)
+++
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
Mon Nov 1 06:17:40 2010
@@ -81,6 +81,7 @@ public class DFSClient implements FSCons
private SocketFactory socketFactory;
private int socketTimeout;
private int datanodeWriteTimeout;
+ private int timeoutValue; // read timeout for the socket
final int writePacketSize;
private final FileSystem.Statistics stats;
private int maxBlockAcquireFailures;
@@ -189,6 +190,7 @@ public class DFSClient implements FSCons
HdfsConstants.READ_TIMEOUT);
this.datanodeWriteTimeout =
conf.getInt("dfs.datanode.socket.write.timeout",
HdfsConstants.WRITE_TIMEOUT);
+ this.timeoutValue = this.socketTimeout;
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
// dfs.write.packet.size is an internal config variable
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
@@ -2205,7 +2207,28 @@ public class DFSClient implements FSCons
int dataPos;
int checksumStart;
int checksumPos;
-
+
+ private static final long HEART_BEAT_SEQNO = -1L;
+
+ /**
+ * create a heartbeat packet
+ */
+ Packet() {
+ this.lastPacketInBlock = false;
+ this.numChunks = 0;
+ this.offsetInBlock = 0;
+ this.seqno = HEART_BEAT_SEQNO;
+
+ buffer = null;
+ int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+ buf = new byte[packetSize];
+
+ checksumStart = dataStart = packetSize;
+ checksumPos = checksumStart;
+ dataPos = dataStart;
+ maxChunks = 0;
+ }
+
// create a new packet
Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
this.lastPacketInBlock = false;
@@ -2286,6 +2309,14 @@ public class DFSClient implements FSCons
buffer.reset();
return buffer;
}
+
+ /**
+ * Check if this packet is a heart beat packet
+ * @return true if the sequence number is HEART_BEAT_SEQNO
+ */
+ private boolean isHeartbeatPacket() {
+ return seqno == HEART_BEAT_SEQNO;
+ }
}
//
@@ -2301,6 +2332,8 @@ public class DFSClient implements FSCons
private volatile boolean closed = false;
public void run() {
+ long lastPacket = 0;
+
while (!closed && clientRunning) {
// if the Responder encountered an error, shutdown Responder
@@ -2320,23 +2353,36 @@ public class DFSClient implements FSCons
boolean doSleep = processDatanodeError(hasError, false);
// wait for a packet to be sent.
+ long now = System.currentTimeMillis();
while ((!closed && !hasError && clientRunning
- && dataQueue.size() == 0) || doSleep) {
+ && dataQueue.size() == 0 &&
+ (blockStream == null || (
+ blockStream != null && now - lastPacket < timeoutValue/2)))
+ || doSleep) {
+ long timeout = timeoutValue/2 - (now-lastPacket);
+ timeout = timeout <= 0 ? 1000 : timeout;
+
try {
- dataQueue.wait(1000);
+ dataQueue.wait(timeout);
+ now = System.currentTimeMillis();
} catch (InterruptedException e) {
}
doSleep = false;
}
- if (closed || hasError || dataQueue.size() == 0 || !clientRunning)
{
+ if (closed || hasError || !clientRunning) {
continue;
}
try {
// get packet to be sent.
- one = dataQueue.getFirst();
+ if (dataQueue.isEmpty()) {
+ one = new Packet(); // heartbeat packet
+ } else {
+ one = dataQueue.getFirst(); // regular data packet
+ }
+
long offsetInBlock = one.offsetInBlock;
-
+
// get new block from namenode.
if (blockStream == null) {
LOG.debug("Allocating new block");
@@ -2358,12 +2404,14 @@ public class DFSClient implements FSCons
ByteBuffer buf = one.getBuffer();
// move packet from dataQueue to ackQueue
- dataQueue.removeFirst();
- dataQueue.notifyAll();
- synchronized (ackQueue) {
- ackQueue.addLast(one);
- ackQueue.notifyAll();
- }
+ if (!one.isHeartbeatPacket()) {
+ dataQueue.removeFirst();
+ dataQueue.notifyAll();
+ synchronized (ackQueue) {
+ ackQueue.addLast(one);
+ ackQueue.notifyAll();
+ }
+ }
// write out data to remote datanode
blockStream.write(buf.array(), buf.position(), buf.remaining());
@@ -2372,6 +2420,8 @@ public class DFSClient implements FSCons
blockStream.writeInt(0); // indicate end-of-block
}
blockStream.flush();
+ lastPacket = System.currentTimeMillis();
+
if (LOG.isDebugEnabled()) {
LOG.debug("DataStreamer block " + block +
" wrote packet seqno:" + one.seqno +
@@ -2480,38 +2530,37 @@ public class DFSClient implements FSCons
if (LOG.isDebugEnabled()) {
LOG.debug("DFSClient " + ack);
}
+
+ // processes response status from all datanodes.
+ for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--)
{
+ short reply = ack.getReply(i);
+ if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
+ errorIndex = i; // first bad datanode
+ throw new IOException("Bad response " + reply +
+ " for block " + block +
+ " from datanode " +
+ targets[i].getName());
+ }
+ }
+
long seqno = ack.getSeqno();
- if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
+ assert seqno != PipelineAck.UNKOWN_SEQNO :
+ "Ack for unkown seqno should be a failed ack: " + ack;
+ if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
continue;
- } else if (seqno == -2) {
- // This signifies that some pipeline node failed to read
downstream
- // and therefore has no idea what sequence number the message
corresponds
- // to. So, we don't try to match it up with an ack.
- assert ! ack.isSuccess();
- } else {
- Packet one = null;
- synchronized (ackQueue) {
- one = ackQueue.getFirst();
- }
- if (one.seqno != seqno) {
- throw new IOException("Responseprocessor: Expecting seqno " +
- " for block " + block +
- one.seqno + " but received " + seqno);
- }
- lastPacketInBlock = one.lastPacketInBlock;
}
- // processes response status from all datanodes.
- for (int i = 0; i < targets.length && clientRunning; i++) {
- short reply = ack.getReply(i);
- if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
- errorIndex = i; // first bad datanode
- throw new IOException("Bad response " + reply +
- " for block " + block +
- " from datanode " +
- targets[i].getName());
- }
+ Packet one = null;
+ synchronized (ackQueue) {
+ one = ackQueue.getFirst();
+ }
+
+ if (one.seqno != seqno) {
+ throw new IOException("Responseprocessor: Expecting seqno " +
+ " for block " + block + " " +
+ one.seqno + " but received " + seqno);
}
+ lastPacketInBlock = one.lastPacketInBlock;
synchronized (ackQueue) {
ackQueue.removeFirst();
@@ -2931,7 +2980,7 @@ public class DFSClient implements FSCons
LOG.debug("Connecting to " + nodes[0].getName());
InetSocketAddress target =
NetUtils.createSocketAddr(nodes[0].getName());
s = socketFactory.createSocket();
- int timeoutValue = 3000 * nodes.length + socketTimeout;
+ timeoutValue = 3000 * nodes.length + socketTimeout;
NetUtils.connect(s, target, timeoutValue);
s.setSoTimeout(timeoutValue);
s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
Modified:
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1029559&r1=1029558&r2=1029559&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
(original)
+++
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
Mon Nov 1 06:17:40 2010
@@ -34,15 +34,10 @@ public interface DataTransferProtocol {
* when protocol changes. It is not very obvious.
*/
/*
- * Version 14:
- * OP_REPLACE_BLOCK is sent from the Balancer server to the destination,
- * including the block id, source, and proxy.
- * OP_COPY_BLOCK is sent from the destination to the proxy, which contains
- * only the block id.
- * A reply to OP_COPY_BLOCK sends the block content.
- * A reply to OP_REPLACE_BLOCK includes an operation status.
+ * Version 15:
+ * A heartbeat is sent from the client to pipeline and then acked back
*/
- public static final int DATA_TRANSFER_VERSION = 14;
+ public static final int DATA_TRANSFER_VERSION = 15;
// Processed at datanode stream-handler
public static final byte OP_WRITE_BLOCK = (byte) 80;
@@ -66,8 +61,7 @@ public interface DataTransferProtocol {
public static class PipelineAck {
private long seqno;
private short replies[];
- final public static PipelineAck HEART_BEAT =
- new PipelineAck(HEARTBEAT_SEQNO, new short[0]);
+ final public static long UNKOWN_SEQNO = -2;
/** default constructor **/
public PipelineAck() {
Modified:
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1029559&r1=1029558&r2=1029559&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
(original)
+++
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Mon Nov 1 06:17:40 2010
@@ -471,6 +471,13 @@ class BlockReceiver implements java.io.C
checksumOut.write(pktBuf, checksumOff, checksumLen);
}
datanode.myMetrics.bytesWritten.inc(len);
+
+ /// flush entire packet before sending ack
+ flush();
+
+ // update length only after flush to disk
+ datanode.data.setVisibleLength(block, offsetInBlock);
+
}
} catch (IOException iex) {
datanode.checkDiskError(iex);
@@ -478,12 +485,6 @@ class BlockReceiver implements java.io.C
}
}
- /// flush entire packet before sending ack
- flush();
-
- // update length only after flush to disk
- datanode.data.setVisibleLength(block, offsetInBlock);
-
// put in queue for pending acks
if (responder != null) {
((PacketResponder)responder.getRunnable()).enqueue(seqno,
@@ -757,134 +758,50 @@ class BlockReceiver implements java.io.C
notifyAll();
}
- private synchronized void lastDataNodeRun() {
- long lastHeartbeat = System.currentTimeMillis();
- boolean lastPacket = false;
-
- while (running && datanode.shouldRun && !lastPacket) {
- long now = System.currentTimeMillis();
- try {
-
- // wait for a packet to be sent to downstream datanode
- while (running && datanode.shouldRun && ackQueue.size() == 0) {
- long idle = now - lastHeartbeat;
- long timeout = (datanode.socketTimeout/2) - idle;
- if (timeout <= 0) {
- timeout = 1000;
- }
- try {
- wait(timeout);
- } catch (InterruptedException e) {
- if (running) {
- LOG.info("PacketResponder " + numTargets +
- " for block " + block + " Interrupted.");
- running = false;
- }
- break;
- }
-
- // send a heartbeat if it is time.
- now = System.currentTimeMillis();
- if (now - lastHeartbeat > datanode.socketTimeout/2) {
- PipelineAck.HEART_BEAT.write(replyOut); // send heart beat
- replyOut.flush();
- if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " sent a heartbeat");
- }
- lastHeartbeat = now;
- }
- }
-
- if (!running || !datanode.shouldRun) {
- break;
- }
- Packet pkt = ackQueue.removeFirst();
- long expected = pkt.seqno;
- notifyAll();
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " acking for packet " + expected);
-
- // If this is the last packet in block, then close block
- // file and finalize the block before responding success
- if (pkt.lastPacketInBlock) {
- if (!receiver.finalized) {
- receiver.close();
- block.setNumBytes(receiver.offsetInBlock);
- datanode.data.finalizeBlock(block);
- datanode.myMetrics.blocksWritten.inc();
- datanode.notifyNamenodeReceivedBlock(block,
- DataNode.EMPTY_DEL_HINT);
- if (ClientTraceLog.isInfoEnabled() &&
- receiver.clientName.length() > 0) {
- ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
- receiver.inAddr, receiver.myAddr, block.getNumBytes(),
- "HDFS_WRITE", receiver.clientName,
- datanode.dnRegistration.getStorageID(), block));
- } else {
- LOG.info("Received block " + block +
- " of size " + block.getNumBytes() +
- " from " + receiver.inAddr);
- }
- }
- lastPacket = true;
- }
-
- new PipelineAck(expected, new short[]{
- DataTransferProtocol.OP_STATUS_SUCCESS}).write(replyOut);
- replyOut.flush();
- } catch (Exception e) {
- LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
- if (running) {
- try {
- datanode.checkDiskError(e); // may throw an exception here
- } catch (IOException ioe) {
- LOG.warn("DataNode.chekDiskError failed in lastDataNodeRun with:
",
- ioe);
- }
- LOG.info("PacketResponder " + block + " " + numTargets +
- " Exception " + StringUtils.stringifyException(e));
- running = false;
- }
- }
- }
- LOG.info("PacketResponder " + numTargets +
- " for block " + block + " terminating");
- }
-
/**
* Thread to process incoming acks.
* @see java.lang.Runnable#run()
*/
public void run() {
-
- // If this is the last datanode in pipeline, then handle differently
- if (numTargets == 0) {
- lastDataNodeRun();
- return;
- }
-
boolean lastPacketInBlock = false;
boolean isInterrupted = false;
while (running && datanode.shouldRun && !lastPacketInBlock) {
try {
- /**
- * Sequence number -2 is a special value that is used when
- * a DN fails to read an ack from a downstream. In this case,
- * it needs to tell the client that there's been an error
downstream
- * but has no valid sequence number to use. Thus, -2 is used
- * as an UNKNOWN value.
- */
- long expected = -2;
- long seqno = -2;
-
- PipelineAck ack = new PipelineAck();
- boolean localMirrorError = mirrorError;
- try {
- if (!localMirrorError) {
+ /**
+ * Sequence number -2 is a special value that is used when
+ * a DN fails to read an ack from a downstream. In this case,
+ * it needs to tell the client that there's been an error downstream
+ * but has no valid sequence number to use. Thus, -2 is used
+ * as an UNKNOWN value.
+ */
+ long expected = PipelineAck.UNKOWN_SEQNO;
+ long seqno = PipelineAck.UNKOWN_SEQNO;;
+
+ PipelineAck ack = new PipelineAck();
+ boolean localMirrorError = mirrorError;
+ try {
+ Packet pkt = null;
+ synchronized (this) {
+ // wait for a packet to arrive
+ while (running && datanode.shouldRun && ackQueue.size() == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets +
+ " seqno = " + seqno +
+ " for block " + block +
+ " waiting for local datanode to finish write.");
+ }
+ wait();
+ }
+ if (!running || !datanode.shouldRun) {
+ break;
+ }
+ pkt = ackQueue.removeFirst();
+ expected = pkt.seqno;
+ notifyAll();
+ }
+ // receive an ack if DN is not the last one in the pipeline
+ if (numTargets > 0 && !localMirrorError) {
// read an ack from downstream datanode
ack.readFields(mirrorIn, numTargets);
if (LOG.isDebugEnabled()) {
@@ -892,34 +809,15 @@ class BlockReceiver implements java.io.C
" for block " + block + " got " + ack);
}
seqno = ack.getSeqno();
- }
- if (seqno >= 0 || localMirrorError) {
- Packet pkt = null;
- synchronized (this) {
- while (running && datanode.shouldRun && ackQueue.size() ==
0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets +
- " seqno = " + seqno +
- " for block " + block +
- " waiting for local datanode to finish
write.");
- }
- wait();
- }
- if (!running || !datanode.shouldRun) {
- break;
- }
- pkt = ackQueue.removeFirst();
- expected = pkt.seqno;
- notifyAll();
- if (seqno != expected && !localMirrorError) {
- throw new IOException("PacketResponder " + numTargets +
- " for block " + block +
- " expected seqno:" + expected +
- " received:" + seqno);
- }
- lastPacketInBlock = pkt.lastPacketInBlock;
+ // verify seqno
+ if (seqno != expected) {
+ throw new IOException("PacketResponder " + numTargets +
+ " for block " + block +
+ " expected seqno:" + expected +
+ " received:" + seqno);
}
}
+ lastPacketInBlock = pkt.lastPacketInBlock;
} catch (InterruptedException ine) {
isInterrupted = true;
} catch (IOException ioe) {
@@ -970,25 +868,21 @@ class BlockReceiver implements java.io.C
}
}
- PipelineAck replyAck;
- if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
- replyAck = ack; // continue to send keep alive
+ // construct my ack message
+ short[] replies = null;
+ if (mirrorError) { // no ack is read
+ replies = new short[2];
+ replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+ replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
} else {
- // construct my ack message
- short[] replies = null;
- if (mirrorError) { // no ack is read
- replies = new short[2];
- replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
- replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
- } else {
- replies = new short[1+ack.getNumOfReplies()];
- replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
- for (int i=0; i<ack.getNumOfReplies(); i++) {
- replies[i+1] = ack.getReply(i);
- }
- }
- replyAck = new PipelineAck(expected, replies);
+ short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+ replies = new short[1+ackLen];
+ replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+ for (int i=0; i<ackLen; i++) {
+ replies[i+1] = ack.getReply(i);
+ }
}
+ PipelineAck replyAck = new PipelineAck(expected, replies);
// send my ack back to upstream datanode
replyAck.write(replyOut);
Modified:
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend.java?rev=1029559&r1=1029558&r2=1029559&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend.java
(original)
+++
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend.java
Mon Nov 1 06:17:40 2010
@@ -22,6 +22,9 @@ import java.io.*;
import java.net.*;
import java.util.List;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -41,6 +44,11 @@ import org.apache.hadoop.hdfs.server.dat
* support HDFS appends.
*/
public class TestFileAppend extends TestCase {
+ {
+ ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ }
+
static final int blockSize = 1024;
static final int numBlocks = 10;
static final int fileSize = numBlocks * blockSize + 1;
@@ -123,7 +131,7 @@ public class TestFileAppend extends Test
private void checkFullFile(FileSystem fs, Path name) throws IOException {
FSDataInputStream stm = fs.open(name);
- byte[] actual = new byte[fileSize];
+ byte[] actual = new byte[fileContents.length];
stm.readFully(0, actual);
checkData(actual, 0, fileContents, "Read 2");
stm.close();
@@ -307,4 +315,59 @@ public class TestFileAppend extends Test
cluster.shutdown();
}
}
+
+
+ /** This creates a slow writer and check to see
+ * if pipeline heartbeats work fine
+ */
+ public void testPipelineHeartbeat() throws Exception {
+ final int DATANODE_NUM = 2;
+ final int fileLen = 6;
+ Configuration conf = new Configuration();
+ final int timeout = 2000;
+ conf.setInt("dfs.socket.timeout",timeout);
+
+ final Path p = new Path("/pipelineHeartbeat/foo");
+ System.out.println("p=" + p);
+
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true,
null);
+ DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+ initBuffer(fileLen);
+
+ try {
+ // create a new file.
+ FSDataOutputStream stm = createFile(fs, p, DATANODE_NUM);
+
+ stm.write(fileContents, 0, 1);
+ Thread.sleep(timeout);
+ stm.sync();
+ System.out.println("Wrote 1 byte and hflush " + p);
+
+ // write another byte
+ Thread.sleep(timeout);
+ stm.write(fileContents, 1, 1);
+ stm.sync();
+
+ stm.write(fileContents, 2, 1);
+ Thread.sleep(timeout);
+ stm.sync();
+
+ stm.write(fileContents, 3, 1);
+ Thread.sleep(timeout);
+ stm.write(fileContents, 4, 1);
+ stm.sync();
+
+ stm.write(fileContents, 5, 1);
+ Thread.sleep(timeout);
+ stm.close();
+
+ // verify that entire file is good
+ checkFullFile(fs, p);
+ } finally {
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
}
Modified:
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1029559&r1=1029558&r2=1029559&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
(original)
+++
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
Mon Nov 1 06:17:40 2010
@@ -965,7 +965,7 @@ public class TestFileAppend4 extends Tes
* Mockito answer helper that triggers one latch as soon as the
* method is called, then waits on another before continuing.
*/
- private static class DelayAnswer implements Answer {
+ public static class DelayAnswer implements Answer {
private final CountDownLatch fireLatch = new CountDownLatch(1);
private final CountDownLatch waitLatch = new CountDownLatch(1);
Added:
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/TestStuckDataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/TestStuckDataNode.java?rev=1029559&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/TestStuckDataNode.java
(added)
+++
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/TestStuckDataNode.java
Mon Nov 1 06:17:40 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.TestFileAppend4.DelayAnswer;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
+
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+/**
+ * This class tests the building blocks that are needed to
+ * support HDFS appends.
+ */
+public class TestStuckDataNode extends TestCase {
+ {
+ ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ /** This creates a slow writer and check to see
+ * if pipeline heartbeats work fine
+ */
+ public void testStuckDataNode() throws Exception {
+ final int DATANODE_NUM = 3;
+ Configuration conf = new Configuration();
+ final int timeout = 8000;
+ conf.setInt("dfs.socket.timeout",timeout);
+
+ final Path p = new Path("/pipelineHeartbeat/foo");
+ System.out.println("p=" + p);
+
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true,
null);
+ DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+ DataNodeMetrics metrics = cluster.getDataNodes().get(0).myMetrics;
+ MetricsTimeVaryingLong spyBytesWritten = spy(metrics.bytesWritten);
+ DelayAnswer delayAnswer = new DelayAnswer();
+ doAnswer(delayAnswer).when(spyBytesWritten).inc(anyInt());
+ metrics.bytesWritten = spyBytesWritten;
+
+ try {
+ // create a new file.
+ FSDataOutputStream stm = fs.create(p);
+ stm.write(1);
+ stm.sync();
+ stm.write(2);
+ stm.close();
+
+ // verify that entire file is good
+ FSDataInputStream in = fs.open(p);
+ assertEquals(1, in.read());
+ assertEquals(2, in.read());
+ in.close();
+ } finally {
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
+}