Author: todd
Date: Mon Oct 31 21:52:52 2011
New Revision: 1195692
URL: http://svn.apache.org/viewvc?rev=1195692&view=rev
Log:
HDFS-2512. Add textual error message to data transfer protocol responses.
Contributed by Todd Lipcon.
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/DataTransferProtos.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1195692&r1=1195691&r2=1195692&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Mon Oct 31 21:52:52 2011
@@ -758,6 +758,9 @@ Release 0.23.0 - Unreleased
HDFS-2436. Change FSNamesystem.setTimes(..) for allowing setting times on
directories. (Uma Maheswara Rao G via szetszwo)
+ HDFS-2512. Add textual error message to data transfer protocol responses
+ (todd)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/DataTransferProtos.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/DataTransferProtos.java?rev=1195692&r1=1195691&r2=1195692&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/DataTransferProtos.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/DataTransferProtos.java
Mon Oct 31 21:52:52 2011
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: datatransfer.proto
@@ -6936,6 +6935,10 @@ public final class DataTransferProtos {
boolean hasChecksumResponse();
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto
getChecksumResponse();
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProtoOrBuilder
getChecksumResponseOrBuilder();
+
+ // optional string message = 4;
+ boolean hasMessage();
+ String getMessage();
}
public static final class BlockOpResponseProto extends
com.google.protobuf.GeneratedMessage
@@ -7021,10 +7024,43 @@ public final class DataTransferProtos {
return checksumResponse_;
}
+ // optional string message = 4;
+ public static final int MESSAGE_FIELD_NUMBER = 4;
+ private java.lang.Object message_;
+ public boolean hasMessage() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public String getMessage() {
+ java.lang.Object ref = message_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ message_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getMessageBytes() {
+ java.lang.Object ref = message_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ message_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
status_ =
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
firstBadLink_ = "";
checksumResponse_ =
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto.getDefaultInstance();
+ message_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -7057,6 +7093,9 @@ public final class DataTransferProtos {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeMessage(3, checksumResponse_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeBytes(4, getMessageBytes());
+ }
getUnknownFields().writeTo(output);
}
@@ -7078,6 +7117,10 @@ public final class DataTransferProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(3, checksumResponse_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(4, getMessageBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -7116,6 +7159,11 @@ public final class DataTransferProtos {
result = result && getChecksumResponse()
.equals(other.getChecksumResponse());
}
+ result = result && (hasMessage() == other.hasMessage());
+ if (hasMessage()) {
+ result = result && getMessage()
+ .equals(other.getMessage());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -7137,6 +7185,10 @@ public final class DataTransferProtos {
hash = (37 * hash) + CHECKSUMRESPONSE_FIELD_NUMBER;
hash = (53 * hash) + getChecksumResponse().hashCode();
}
+ if (hasMessage()) {
+ hash = (37 * hash) + MESSAGE_FIELD_NUMBER;
+ hash = (53 * hash) + getMessage().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
@@ -7264,6 +7316,8 @@ public final class DataTransferProtos {
checksumResponseBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000004);
+ message_ = "";
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@@ -7318,6 +7372,10 @@ public final class DataTransferProtos {
} else {
result.checksumResponse_ = checksumResponseBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.message_ = message_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -7343,6 +7401,9 @@ public final class DataTransferProtos {
if (other.hasChecksumResponse()) {
mergeChecksumResponse(other.getChecksumResponse());
}
+ if (other.hasMessage()) {
+ setMessage(other.getMessage());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -7409,6 +7470,11 @@ public final class DataTransferProtos {
setChecksumResponse(subBuilder.buildPartial());
break;
}
+ case 34: {
+ bitField0_ |= 0x00000008;
+ message_ = input.readBytes();
+ break;
+ }
}
}
}
@@ -7565,6 +7631,42 @@ public final class DataTransferProtos {
return checksumResponseBuilder_;
}
+ // optional string message = 4;
+ private java.lang.Object message_ = "";
+ public boolean hasMessage() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public String getMessage() {
+ java.lang.Object ref = message_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ message_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setMessage(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000008;
+ message_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearMessage() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ message_ = getDefaultInstance().getMessage();
+ onChanged();
+ return this;
+ }
+ void setMessage(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000008;
+ message_ = value;
+ onChanged();
+ }
+
// @@protoc_insertion_point(builder_scope:BlockOpResponseProto)
}
@@ -8995,19 +9097,20 @@ public final class DataTransferProtos {
"\030\001 \002(\020\022\r\n\005seqno\030\002
\002(\020\022\031\n\021lastPacketInBlo" +
"ck\030\003 \002(\010\022\017\n\007dataLen\030\004
\002(\017\":\n\020PipelineAck" +
"Proto\022\r\n\005seqno\030\001 \002(\022\022\027\n\006status\030\002
\003(\0162\007.S" +
- "tatus\"~\n\024BlockOpResponseProto\022\027\n\006status\030" +
- "\001 \002(\0162\007.Status\022\024\n\014firstBadLink\030\002
\001(\t\0227\n\020" +
- "checksumResponse\030\003 \001(\0132\035.OpBlockChecksum" +
- "ResponseProto\"0\n\025ClientReadStatusProto\022\027" +
- "\n\006status\030\001 \002(\0162\007.Status\"-\n\022DNTransferAck" +
- "Proto\022\027\n\006status\030\001 \002(\0162\007.Status\"U\n\034OpBloc",
- "kChecksumResponseProto\022\023\n\013bytesPerCrc\030\001 " +
- "\002(\r\022\023\n\013crcPerBlock\030\002
\002(\004\022\013\n\003md5\030\003 \002(\014*\202\001" +
-
"\n\006Status\022\013\n\007SUCCESS\020\000\022\t\n\005ERROR\020\001\022\022\n\016ERRO"
+
- "R_CHECKSUM\020\002\022\021\n\rERROR_INVALID\020\003\022\020\n\014ERROR"
+
-
"_EXISTS\020\004\022\026\n\022ERROR_ACCESS_TOKEN\020\005\022\017\n\013CHE" +
- "CKSUM_OK\020\006B>\n%org.apache.hadoop.hdfs.pro" +
- "tocol.protoB\022DataTransferProtos\240\001\001"
+ "tatus\"\217\001\n\024BlockOpResponseProto\022\027\n\006status" +
+ "\030\001 \002(\0162\007.Status\022\024\n\014firstBadLink\030\002
\001(\t\0227\n" +
+ "\020checksumResponse\030\003 \001(\0132\035.OpBlockChecksu" +
+ "mResponseProto\022\017\n\007message\030\004 \001(\t\"0\n\025Clien" +
+ "tReadStatusProto\022\027\n\006status\030\001 \002(\0162\007.Statu" +
+ "s\"-\n\022DNTransferAckProto\022\027\n\006status\030\001 \002(\0162",
+ "\007.Status\"U\n\034OpBlockChecksumResponseProto" +
+ "\022\023\n\013bytesPerCrc\030\001
\002(\r\022\023\n\013crcPerBlock\030\002 \002" +
+ "(\004\022\013\n\003md5\030\003
\002(\014*\202\001\n\006Status\022\013\n\007SUCCESS\020\000\022" +
+
"\t\n\005ERROR\020\001\022\022\n\016ERROR_CHECKSUM\020\002\022\021\n\rERROR_" +
+
"INVALID\020\003\022\020\n\014ERROR_EXISTS\020\004\022\026\n\022ERROR_ACC" +
+ "ESS_TOKEN\020\005\022\017\n\013CHECKSUM_OK\020\006B>\n%org.apac" +
+ "he.hadoop.hdfs.protocol.protoB\022DataTrans" +
+ "ferProtos\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner
assigner =
new
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9099,7 +9202,7 @@ public final class DataTransferProtos {
internal_static_BlockOpResponseProto_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_BlockOpResponseProto_descriptor,
- new java.lang.String[] { "Status", "FirstBadLink",
"ChecksumResponse", },
+ new java.lang.String[] { "Status", "FirstBadLink",
"ChecksumResponse", "Message", },
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.class,
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder.class);
internal_static_ClientReadStatusProto_descriptor =
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1195692&r1=1195691&r2=1195692&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
Mon Oct 31 21:52:52 2011
@@ -358,7 +358,8 @@ public class Balancer {
if (response.getStatus() != Status.SUCCESS) {
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN)
throw new IOException("block move failed due to access token error");
- throw new IOException("block move is failed");
+ throw new IOException("block move is failed: " +
+ response.getMessage());
}
}
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1195692&r1=1195691&r2=1195692&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
Mon Oct 31 21:52:52 2011
@@ -48,6 +48,8 @@ import org.apache.hadoop.hdfs.protocol.d
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
+import
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProtoOrBuilder;
import
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -225,13 +227,14 @@ class DataXceiver extends Receiver imple
blockSender = new BlockSender(block, blockOffset, length,
true, true, false, datanode, clientTraceFmt);
} catch(IOException e) {
- LOG.info("opReadBlock " + block + " received exception " + e);
- sendResponse(s, ERROR, datanode.socketWriteTimeout);
+ String msg = "opReadBlock " + block + " received exception " + e;
+ LOG.info(msg);
+ sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
throw e;
}
// send op status
- sendResponse(s, SUCCESS, datanode.socketWriteTimeout);
+ sendResponse(s, SUCCESS, null, datanode.socketWriteTimeout);
long read = blockSender.sendBlock(out, baseStream, null); // send data
@@ -452,7 +455,7 @@ class DataXceiver extends Receiver imple
if (LOG.isTraceEnabled()) {
LOG.trace("TRANSFER: send close-ack");
}
- writeResponse(SUCCESS, replyOut);
+ writeResponse(SUCCESS, null, replyOut);
}
}
@@ -507,7 +510,7 @@ class DataXceiver extends Receiver imple
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
try {
datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
- writeResponse(Status.SUCCESS, out);
+ writeResponse(Status.SUCCESS, null, out);
} finally {
IOUtils.closeStream(out);
}
@@ -577,16 +580,17 @@ class DataXceiver extends Receiver imple
LOG.warn("Invalid access token in request from " + remoteAddress
+ " for OP_COPY_BLOCK for block " + block + " : "
+ e.getLocalizedMessage());
- sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
+ sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token",
datanode.socketWriteTimeout);
return;
}
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
- LOG.info("Not able to copy block " + block.getBlockId() + " to "
- + s.getRemoteSocketAddress() + " because threads quota is
exceeded.");
- sendResponse(s, ERROR, datanode.socketWriteTimeout);
+ String msg = "Not able to copy block " + block.getBlockId() + " to "
+ + s.getRemoteSocketAddress() + " because threads quota is exceeded.";
+ LOG.info(msg);
+ sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
return;
}
@@ -606,7 +610,7 @@ class DataXceiver extends Receiver imple
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
// send status first
- writeResponse(SUCCESS, reply);
+ writeResponse(SUCCESS, null, reply);
// send block content to the target
long read = blockSender.sendBlock(reply, baseStream,
dataXceiverServer.balanceThrottler);
@@ -653,21 +657,24 @@ class DataXceiver extends Receiver imple
LOG.warn("Invalid access token in request from " + remoteAddress
+ " for OP_REPLACE_BLOCK for block " + block + " : "
+ e.getLocalizedMessage());
- sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
+ sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token",
+ datanode.socketWriteTimeout);
return;
}
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
- LOG.warn("Not able to receive block " + block.getBlockId() + " from "
- + s.getRemoteSocketAddress() + " because threads quota is
exceeded.");
- sendResponse(s, ERROR, datanode.socketWriteTimeout);
+ String msg = "Not able to receive block " + block.getBlockId() + " from
"
+ + s.getRemoteSocketAddress() + " because threads quota is
exceeded.";
+ LOG.warn(msg);
+ sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
return;
}
Socket proxySock = null;
DataOutputStream proxyOut = null;
Status opStatus = SUCCESS;
+ String errMsg = null;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
@@ -720,7 +727,8 @@ class DataXceiver extends Receiver imple
} catch (IOException ioe) {
opStatus = ERROR;
- LOG.info("opReplaceBlock " + block + " received exception " + ioe);
+ errMsg = "opReplaceBlock " + block + " received exception " + ioe;
+ LOG.info(errMsg);
throw ioe;
} finally {
// receive the last byte that indicates the proxy released its thread
resource
@@ -736,7 +744,7 @@ class DataXceiver extends Receiver imple
// send response back
try {
- sendResponse(s, opStatus, datanode.socketWriteTimeout);
+ sendResponse(s, opStatus, errMsg, datanode.socketWriteTimeout);
} catch (IOException ioe) {
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
}
@@ -759,21 +767,22 @@ class DataXceiver extends Receiver imple
* @param opStatus status message to write
* @param timeout send timeout
**/
- private void sendResponse(Socket s, Status status,
+ private void sendResponse(Socket s, Status status, String message,
long timeout) throws IOException {
DataOutputStream reply =
new DataOutputStream(NetUtils.getOutputStream(s, timeout));
- writeResponse(status, reply);
+ writeResponse(status, message, reply);
}
- private void writeResponse(Status status, OutputStream out)
+ private void writeResponse(Status status, String message, OutputStream out)
throws IOException {
- BlockOpResponseProto response = BlockOpResponseProto.newBuilder()
- .setStatus(status)
- .build();
-
- response.writeDelimitedTo(out);
+ BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
+ .setStatus(status);
+ if (message != null) {
+ response.setMessage(message);
+ }
+ response.build().writeDelimitedTo(out);
out.flush();
}
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto?rev=1195692&r1=1195691&r2=1195692&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto
Mon Oct 31 21:52:52 2011
@@ -119,6 +119,9 @@ message BlockOpResponseProto {
optional string firstBadLink = 2;
optional OpBlockChecksumResponseProto checksumResponse = 3;
+
+ /** explanatory text which may be useful to log on the client side */
+ optional string message = 4;
}
/**
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1195692&r1=1195691&r2=1195692&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
Mon Oct 31 21:52:52 2011
@@ -117,10 +117,8 @@ public class TestDataTransferProtocol ex
throw eof;
}
- LOG.info("Received: " +
- StringUtils.byteToHexString(retBuf));
- LOG.info("Expected: " +
- StringUtils.byteToHexString(recvBuf.toByteArray()));
+ LOG.info("Received: " +new String(retBuf));
+ LOG.info("Expected: " +
StringUtils.byteToHexString(recvBuf.toByteArray()));
if (eofExpected) {
throw new IOException("Did not recieve IOException when an exception "
+
@@ -129,10 +127,8 @@ public class TestDataTransferProtocol ex
}
byte[] needed = recvBuf.toByteArray();
- for (int i=0; i<retBuf.length; i++) {
- System.out.print(retBuf[i]);
- assertEquals("checking byte[" + i + "]", needed[i], retBuf[i]);
- }
+ assertEquals(StringUtils.byteToHexString(needed),
+ StringUtils.byteToHexString(retBuf));
} finally {
IOUtils.closeSocket(sock);
}
@@ -166,18 +162,22 @@ public class TestDataTransferProtocol ex
sendOut.writeInt(0); // zero checksum
//ok finally write a block with 0 len
- sendResponse(Status.SUCCESS, "", recvOut);
+ sendResponse(Status.SUCCESS, "", null, recvOut);
new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
sendRecvData(description, false);
}
private void sendResponse(Status status, String firstBadLink,
+ String message,
DataOutputStream out)
throws IOException {
Builder builder = BlockOpResponseProto.newBuilder().setStatus(status);
if (firstBadLink != null) {
builder.setFirstBadLink(firstBadLink);
}
+ if (message != null) {
+ builder.setMessage(message);
+ }
builder.build()
.writeDelimitedTo(out);
}
@@ -190,11 +190,11 @@ public class TestDataTransferProtocol ex
new DatanodeInfo[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS);
if (eofExcepted) {
- sendResponse(Status.ERROR, null, recvOut);
+ sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData(description, true);
} else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
//ok finally write a block with 0 len
- sendResponse(Status.SUCCESS, "", recvOut);
+ sendResponse(Status.SUCCESS, "", null, recvOut);
sendRecvData(description, false);
} else {
writeZeroLengthPacket(block, description);
@@ -383,7 +383,7 @@ public class TestDataTransferProtocol ex
// bad bytes per checksum
sendOut.writeInt(-1-random.nextInt(oneMil));
recvBuf.reset();
- sendResponse(Status.ERROR, null, recvOut);
+ sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData("wrong bytesPerChecksum while writing", true);
sendBuf.reset();
@@ -403,7 +403,7 @@ public class TestDataTransferProtocol ex
-1 - random.nextInt(oneMil)); // bad datalen
hdr.write(sendOut);
- sendResponse(Status.SUCCESS, "", recvOut);
+ sendResponse(Status.SUCCESS, "", null, recvOut);
new PipelineAck(100, new Status[]{Status.ERROR}).write(recvOut);
sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId,
true);
@@ -428,7 +428,7 @@ public class TestDataTransferProtocol ex
sendOut.writeInt(0); // zero checksum
sendOut.flush();
//ok finally write a block with 0 len
- sendResponse(Status.SUCCESS, "", recvOut);
+ sendResponse(Status.SUCCESS, "", null, recvOut);
new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
sendRecvData("Writing a zero len block blockid " + newBlockId, false);
@@ -462,7 +462,7 @@ public class TestDataTransferProtocol ex
// negative length is ok. Datanode assumes we want to read the whole block.
recvBuf.reset();
- sendResponse(Status.SUCCESS, null, recvOut);
+ sendResponse(Status.SUCCESS, null, null, recvOut);
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, -1L-random.nextInt(oneMil));
@@ -471,7 +471,11 @@ public class TestDataTransferProtocol ex
// length is more than size of block.
recvBuf.reset();
- sendResponse(Status.ERROR, null, recvOut);
+ sendResponse(Status.ERROR, null,
+ "opReadBlock " + firstBlock +
+ " received exception java.io.IOException: " +
+ "Offset 0 and length 4097 don't match block " + firstBlock + " (
blockLen 4096 )",
+ recvOut);
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen+1);