Author: hairong
Date: Fri Jan 15 01:12:55 2010
New Revision: 899508
URL: http://svn.apache.org/viewvc?rev=899508&view=rev
Log:
Revert the change that HDFS-793 made to branch 0.20.
Modified:
hadoop/common/branches/branch-0.20/CHANGES.txt
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=899508&r1=899507&r2=899508&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Fri Jan 15 01:12:55 2010
@@ -70,9 +70,6 @@
MAPREDUCE-1182. Fix overflow in reduce causing allocations to exceed the
configured threshold. (cdouglas)
- HDFS-793. Data node should recceive the whole packet ack message before
- it constructs and sends its own ack message for the packet. (hairong)
-
HADOOP-6386. NameNode's HttpServer can't instantiate InetSocketAddress:
IllegalArgumentException is thrown. (cos)
Modified:
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=899508&r1=899507&r2=899508&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
(original)
+++
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
Fri Jan 15 01:12:55 2010
@@ -29,7 +29,6 @@
import org.apache.hadoop.conf.*;
import org.apache.hadoop.hdfs.DistributedFileSystem.DiskStatus;
import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -2397,18 +2396,14 @@
public void run() {
this.setName("ResponseProcessor for block " + block);
- PipelineAck ack = new PipelineAck();
while (!closed && clientRunning && !lastPacketInBlock) {
// process responses from datanodes.
try {
- // read an ack from the pipeline
- ack.readFields(blockReplyStream);
- if (LOG.isDebugEnabled()) {
- LOG.debug("DFSClient " + ack);
- }
- long seqno = ack.getSeqno();
- if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
+ // verify seqno from datanode
+ long seqno = blockReplyStream.readLong();
+ LOG.debug("DFSClient received ack for seqno " + seqno);
+ if (seqno == -1) {
continue;
} else if (seqno == -2) {
// no nothing
@@ -2426,8 +2421,8 @@
}
// processes response status from all datanodes.
- for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning;
i--) {
- short reply = ack.getReply(i);
+ for (int i = 0; i < targets.length && clientRunning; i++) {
+ short reply = blockReplyStream.readShort();
if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
errorIndex = i; // first bad datanode
throw new IOException("Bad response " + reply +
Modified:
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=899508&r1=899507&r2=899508&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
(original)
+++
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
Fri Jan 15 01:12:55 2010
@@ -17,11 +17,6 @@
*/
package org.apache.hadoop.hdfs.protocol;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
/**
*
@@ -36,11 +31,15 @@
* when protocol changes. It is not very obvious.
*/
/*
- * Version 18:
- * Change the block packet ack protocol to include seqno,
- * numberOfReplies, reply0, reply1, ...
+ * Version 14:
+ * OP_REPLACE_BLOCK is sent from the Balancer server to the destination,
+ * including the block id, source, and proxy.
+ * OP_COPY_BLOCK is sent from the destination to the proxy, which contains
+ * only the block id.
+ * A reply to OP_COPY_BLOCK sends the block content.
+ * A reply to OP_REPLACE_BLOCK includes an operation status.
*/
- public static final int DATA_TRANSFER_VERSION = 18;
+ public static final int DATA_TRANSFER_VERSION = 14;
// Processed at datanode stream-handler
public static final byte OP_WRITE_BLOCK = (byte) 80;
@@ -57,97 +56,6 @@
public static final int OP_STATUS_ERROR_EXISTS = 4;
public static final int OP_STATUS_CHECKSUM_OK = 5;
- /** reply **/
- public static class PipelineAck implements Writable {
- private long seqno;
- private short replies[];
- final public static PipelineAck HEART_BEAT = new PipelineAck(-1, new
short[0]);
-
- /** default constructor **/
- public PipelineAck() {
- }
-
- /**
- * Constructor
- * @param seqno sequence number
- * @param replies an array of replies
- */
- public PipelineAck(long seqno, short[] replies) {
- this.seqno = seqno;
- this.replies = replies;
- }
-
- /**
- * Get the sequence number
- * @return the sequence number
- */
- public long getSeqno() {
- return seqno;
- }
-
- /**
- * Get the number of replies
- * @return the number of replies
- */
- public short getNumOfReplies() {
- return (short)replies.length;
- }
-
- /**
- * get the ith reply
- * @return the the ith reply
- */
- public short getReply(int i) {
- return replies[i];
- }
-
- /**
- * Check if this ack contains error status
- * @return true if all statuses are SUCCESS
- */
- public boolean isSuccess() {
- for (short reply : replies) {
- if (reply != OP_STATUS_SUCCESS) {
- return false;
- }
- }
- return true;
- }
-
- /**** Writable interface ****/
- @Override // Writable
- public void readFields(DataInput in) throws IOException {
- seqno = in.readLong();
- short numOfReplies = in.readShort();
- replies = new short[numOfReplies];
- for (int i=0; i<numOfReplies; i++) {
- replies[i] = in.readShort();
- }
- }
-
- @Override // Writable
- public void write(DataOutput out) throws IOException {
- //WritableUtils.writeVLong(out, seqno);
- out.writeLong(seqno);
- out.writeShort((short)replies.length);
- for(short reply : replies) {
- out.writeShort(reply);
- }
- }
-
- @Override //Object
- public String toString() {
- StringBuilder ack = new StringBuilder("Replies for seqno ");
- ack.append( seqno ).append( " are" );
- for(short reply : replies) {
- ack.append(" ");
- if (reply == OP_STATUS_SUCCESS) {
- ack.append("SUCCESS");
- } else {
- ack.append("FAILED");
- }
- }
- return ack.toString();
- }
- }
+
+
}
Modified:
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=899508&r1=899507&r2=899508&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
(original)
+++
hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Fri Jan 15 01:12:55 2010
@@ -36,7 +36,6 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
@@ -774,13 +773,8 @@
// send a heartbeat if it is time.
now = System.currentTimeMillis();
if (now - lastHeartbeat > datanode.socketTimeout/2) {
- PipelineAck.HEART_BEAT.write(replyOut); // send heart beat
+ replyOut.writeLong(-1); // send heartbeat
replyOut.flush();
- if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " sent a heartbeat");
- }
lastHeartbeat = now;
}
}
@@ -820,8 +814,8 @@
lastPacket = true;
}
- new PipelineAck(expected, new short[]{
- DataTransferProtocol.OP_STATUS_SUCCESS}).write(replyOut);
+ replyOut.writeLong(expected);
+ replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
replyOut.flush();
} catch (Exception e) {
if (running) {
@@ -851,21 +845,23 @@
while (running && datanode.shouldRun && !lastPacketInBlock) {
try {
+ short op = DataTransferProtocol.OP_STATUS_SUCCESS;
boolean didRead = false;
long expected = -2;
- PipelineAck ack = new PipelineAck();
try {
- // read an ack from downstream datanode
- ack.readFields(mirrorIn);
- if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets + " got " + ack);
- }
- long seqno = ack.getSeqno();
+ // read seqno from downstream datanode
+ long seqno = mirrorIn.readLong();
didRead = true;
- if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
- ack.write(replyOut); // send keepalive
+ if (seqno == -1) {
+ replyOut.writeLong(-1); // send keepalive
replyOut.flush();
- } else if (seqno >= 0) {
+ LOG.debug("PacketResponder " + numTargets + " got -1");
+ continue;
+ } else if (seqno == -2) {
+ LOG.debug("PacketResponder " + numTargets + " got -2");
+ } else {
+ LOG.debug("PacketResponder " + numTargets + " got seqno = " +
+ seqno);
Packet pkt = null;
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() ==
0) {
@@ -880,6 +876,7 @@
pkt = ackQueue.removeFirst();
expected = pkt.seqno;
notifyAll();
+ LOG.debug("PacketResponder " + numTargets + " seqno = " +
seqno);
if (seqno != expected) {
throw new IOException("PacketResponder " + numTargets +
" for block " + block +
@@ -912,6 +909,10 @@
continue;
}
+ if (!didRead) {
+ op = DataTransferProtocol.OP_STATUS_ERROR;
+ }
+
// If this is the last packet in block, then close block
// file and finalize the block before responding success
if (lastPacketInBlock && !receiver.finalized) {
@@ -934,34 +935,43 @@
}
}
- // construct my ack message
- short[] replies = null;
- if (!didRead) { // no ack is read
- replies = new short[2];
- replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
- replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
- } else {
- replies = new short[1+ack.getNumOfReplies()];
- replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
- for (int i=0; i<ack.getNumOfReplies(); i++) {
- replies[i+1] = ack.getReply(i);
+ // send my status back to upstream datanode
+ replyOut.writeLong(expected); // send seqno upstream
+ replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
+
+ LOG.debug("PacketResponder " + numTargets +
+ " for block " + block +
+ " responded my status " +
+ " for seqno " + expected);
+
+ // forward responses from downstream datanodes.
+ for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
+ try {
+ if (op == DataTransferProtocol.OP_STATUS_SUCCESS) {
+ op = mirrorIn.readShort();
+ if (op != DataTransferProtocol.OP_STATUS_SUCCESS) {
+ LOG.debug("PacketResponder for block " + block +
+ ": error code received from downstream " +
+ " datanode[" + i + "] " + op);
+ }
+ }
+ } catch (Throwable e) {
+ op = DataTransferProtocol.OP_STATUS_ERROR;
}
+ replyOut.writeShort(op);
}
- PipelineAck replyAck = new PipelineAck(expected, replies);
-
- // send my ack back to upstream datanode
- replyAck.write(replyOut);
replyOut.flush();
- if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " responded an ack: " + replyAck);
- }
+ LOG.debug("PacketResponder " + block + " " + numTargets +
+ " responded other status " + " for seqno " + expected);
+ // If we were unable to read the seqno from downstream, then stop.
+ if (expected == -2) {
+ running = false;
+ }
// If we forwarded an error response from a downstream datanode
// and we are acting on behalf of a client, then we quit. The
// client will drive the recovery mechanism.
- if (!replyAck.isSuccess() && receiver.clientName.length() > 0) {
+ if (op == DataTransferProtocol.OP_STATUS_ERROR &&
receiver.clientName.length() > 0) {
running = false;
}
} catch (IOException e) {
Modified:
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=899508&r1=899507&r2=899508&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
(original)
+++
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
Fri Jan 15 01:12:55 2010
@@ -250,9 +250,9 @@
sendOut.writeInt(0); // chunk length
sendOut.writeInt(0); // zero checksum
//ok finally write a block with 0 len
- Text.writeString(recvOut, "");
- new DataTransferProtocol.PipelineAck(100,
- new short[]{DataTransferProtocol.OP_STATUS_SUCCESS}).write(recvOut);
+ Text.writeString(recvOut, ""); // first bad node
+ recvOut.writeLong(100); // sequencenumber
+ recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
sendRecvData("Writing a zero len block blockid " + newBlockId, false);
/* Test OP_READ_BLOCK */