Author: sradia
Date: Thu Mar 21 16:41:28 2013
New Revision: 1459392
URL: http://svn.apache.org/r1459392
Log:
HADOOP-9380 Add totalLength to rpc response (sanjay Radia)
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1459392&r1=1459391&r2=1459392&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Thu Mar
21 16:41:28 2013
@@ -15,6 +15,8 @@ Trunk (Unreleased)
HADOOP-9151 Include RPC error info in RpcResponseHeader instead of sending
it separately (sanjay Radia)
+ HADOOP-9380 Add totalLength to rpc response (sanjay Radia)
+
NEW FEATURES
HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1459392&r1=1459391&r2=1459392&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
(original)
+++
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
Thu Mar 21 16:41:28 2013
@@ -83,6 +83,7 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.Time;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedOutputStream;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -242,7 +243,7 @@ public class Client {
callComplete();
}
- public synchronized Writable getRpcResult() {
+ public synchronized Writable getRpcResponse() {
return rpcResponse;
}
}
@@ -944,11 +945,14 @@ public class Client {
touch();
try {
+ int totalLen = in.readInt();
RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in);
if (header == null) {
throw new IOException("Response is null.");
}
+ int headerLen = header.getSerializedSize();
+ headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
int callId = header.getCallId();
if (LOG.isDebugEnabled())
@@ -961,11 +965,28 @@ public class Client {
value.readFields(in); // read value
call.setRpcResponse(value);
calls.remove(callId);
+
+ // verify that length was correct
+ // only for ProtobufEngine where len can be verified easily
+ if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
+ ProtobufRpcEngine.RpcWrapper resWrapper =
+ (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
+ if (totalLen != headerLen + resWrapper.getLength()) {
+ throw new RpcClientException(
+ "RPC response length mismatch on rpc success");
+ }
+ }
} else { // Rpc Request failed
- final String exceptionClassName = header.hasExceptionClassName() ?
+ // Verify that length was correct
+ if (totalLen != headerLen) {
+ throw new RpcClientException(
+ "RPC response length mismatch on rpc error");
+ }
+
+ final String exceptionClassName = header.hasExceptionClassName() ?
header.getExceptionClassName() :
"ServerDidNotSetExceptionClassName";
- final String errorMsg = header.hasErrorMsg() ?
+ final String errorMsg = header.hasErrorMsg() ?
header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
RemoteException re =
new RemoteException(exceptionClassName, errorMsg);
@@ -1251,7 +1272,7 @@ public class Client {
call.error);
}
} else {
- return call.getRpcResult();
+ return call.getRpcResponse();
}
}
}
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1459392&r1=1459391&r2=1459392&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
(original)
+++
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
Thu Mar 21 16:41:28 2013
@@ -48,7 +48,9 @@ import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.AbstractMessageLite;
import com.google.protobuf.BlockingService;
+import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
@@ -226,7 +228,7 @@ public class ProtobufRpcEngine implement
Message returnMessage;
try {
returnMessage = prototype.newBuilderForType()
- .mergeFrom(val.responseMessage).build();
+ .mergeFrom(val.theResponseRead).build();
if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getId() + ": Response <- " +
@@ -267,6 +269,9 @@ public class ProtobufRpcEngine implement
}
}
+ interface RpcWrapper extends Writable {
+ int getLength();
+ }
/**
* Wrapper for Protocol Buffer Requests
*
@@ -274,7 +279,7 @@ public class ProtobufRpcEngine implement
* Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC}
* use type Writable as a wrapper to work across multiple RpcEngine kinds.
*/
- private static class RpcRequestWrapper implements Writable {
+ private static class RpcRequestWrapper implements RpcWrapper {
RequestHeaderProto requestHeader;
Message theRequest; // for clientSide, the request is here
byte[] theRequestRead; // for server side, the request is here
@@ -312,6 +317,22 @@ public class ProtobufRpcEngine implement
return requestHeader.getDeclaringClassProtocolName() + "." +
requestHeader.getMethodName();
}
+
+ @Override
+ public int getLength() {
+ int headerLen = requestHeader.getSerializedSize();
+ int reqLen;
+ if (theRequest != null) {
+ reqLen = theRequest.getSerializedSize();
+ } else if (theRequestRead != null ) {
+ reqLen = theRequestRead.length;
+ } else {
+ throw new IllegalArgumentException(
+ "getLenght on uninilialized RpcWrapper");
+ }
+ return CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen
+ + CodedOutputStream.computeRawVarint32Size(reqLen) + reqLen;
+ }
}
/**
@@ -321,29 +342,43 @@ public class ProtobufRpcEngine implement
* Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC}
* use type Writable as a wrapper to work across multiple RpcEngine kinds.
*/
- private static class RpcResponseWrapper implements Writable {
- byte[] responseMessage;
+ private static class RpcResponseWrapper implements RpcWrapper {
+ Message theResponse; // for senderSide, the response is here
+ byte[] theResponseRead; // for receiver side, the response is here
@SuppressWarnings("unused")
public RpcResponseWrapper() {
}
public RpcResponseWrapper(Message message) {
- this.responseMessage = message.toByteArray();
+ this.theResponse = message;
}
@Override
public void write(DataOutput out) throws IOException {
- out.writeInt(responseMessage.length);
- out.write(responseMessage);
+ OutputStream os = DataOutputOutputStream.constructOutputStream(out);
+ theResponse.writeDelimitedTo(os);
}
@Override
public void readFields(DataInput in) throws IOException {
- int length = in.readInt();
- byte[] bytes = new byte[length];
- in.readFully(bytes);
- responseMessage = bytes;
+ int length = ProtoUtil.readRawVarint32(in);
+ theResponseRead = new byte[length];
+ in.readFully(theResponseRead);
+ }
+
+ @Override
+ public int getLength() {
+ int resLen;
+ if (theResponse != null) {
+ resLen = theResponse.getSerializedSize();
+ } else if (theResponseRead != null ) {
+ resLen = theResponseRead.length;
+ } else {
+ throw new IllegalArgumentException(
+ "getLenght on uninilialized RpcWrapper");
+ }
+ return CodedOutputStream.computeRawVarint32Size(resLen) + resLen;
}
}
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1459392&r1=1459391&r2=1459392&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
(original)
+++
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
Thu Mar 21 16:41:28 2013
@@ -73,6 +73,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@@ -107,6 +108,7 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.CodedOutputStream;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -202,7 +204,8 @@ public abstract class Server {
// 6 : Made RPC Request header explicit
// 7 : Changed Ipc Connection Header to use Protocol buffers
// 8 : SASL server always sends a final response
- public static final byte CURRENT_VERSION = 8;
+ // 9 : Changes to protocol for HADOOP-8990
+ public static final byte CURRENT_VERSION = 9;
/**
* Initial and max size of response buffer
@@ -1512,10 +1515,15 @@ public abstract class Server {
" cannot communicate with client version " + clientVersion;
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- if (clientVersion >= 3) {
+ if (clientVersion >= 9) {
+ // Versions >>9 understand the normal response
Call fakeCall = new Call(-1, null, this);
- // Versions 3 and greater can interpret this exception
- // response in the same manner
+ setupResponse(buffer, fakeCall, RpcStatusProto.FATAL,
+ null, VersionMismatch.class.getName(), errMsg);
+ responder.doRespond(fakeCall);
+ } else if (clientVersion >= 3) {
+ Call fakeCall = new Call(-1, null, this);
+ // Versions 3 to 8 use older response
setupResponseOldVersionFatal(buffer, fakeCall,
null, VersionMismatch.class.getName(), errMsg);
@@ -1997,17 +2005,34 @@ public abstract class Server {
throws IOException {
responseBuf.reset();
DataOutputStream out = new DataOutputStream(responseBuf);
- RpcResponseHeaderProto.Builder response =
+ RpcResponseHeaderProto.Builder headerBuilder =
RpcResponseHeaderProto.newBuilder();
- response.setCallId(call.callId);
- response.setStatus(status);
- response.setServerIpcVersionNum(Server.CURRENT_VERSION);
-
+ headerBuilder.setCallId(call.callId);
+ headerBuilder.setStatus(status);
+ headerBuilder.setServerIpcVersionNum(Server.CURRENT_VERSION);
if (status == RpcStatusProto.SUCCESS) {
+ RpcResponseHeaderProto header = headerBuilder.build();
+ final int headerLen = header.getSerializedSize();
+ int fullLength = CodedOutputStream.computeRawVarint32Size(headerLen) +
+ headerLen;
try {
- response.build().writeDelimitedTo(out);
- rv.write(out);
+ if (rv instanceof ProtobufRpcEngine.RpcWrapper) {
+ ProtobufRpcEngine.RpcWrapper resWrapper =
+ (ProtobufRpcEngine.RpcWrapper) rv;
+ fullLength += resWrapper.getLength();
+ out.writeInt(fullLength);
+ header.writeDelimitedTo(out);
+ rv.write(out);
+ } else { // Have to serialize to buffer to get len
+ final DataOutputBuffer buf = new DataOutputBuffer();
+ rv.write(buf);
+ byte[] data = buf.getData();
+ fullLength += buf.getLength();
+ out.writeInt(fullLength);
+ header.writeDelimitedTo(out);
+ out.write(data, 0, buf.getLength());
+ }
} catch (Throwable t) {
LOG.warn("Error serializing call response for call " + call, t);
// Call back to same function - this is OK since the
@@ -2019,9 +2044,14 @@ public abstract class Server {
return;
}
} else { // Rpc Failure
- response.setExceptionClassName(errorClass);
- response.setErrorMsg(error);
- response.build().writeDelimitedTo(out);
+ headerBuilder.setExceptionClassName(errorClass);
+ headerBuilder.setErrorMsg(error);
+ RpcResponseHeaderProto header = headerBuilder.build();
+ int headerLen = header.getSerializedSize();
+ final int fullLength =
+ CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen;
+ out.writeInt(fullLength);
+ header.writeDelimitedTo(out);
}
if (call.connection.useWrap) {
wrapWithSasl(responseBuf, call);