Author: szetszwo
Date: Mon May 14 19:07:57 2012
New Revision: 1338348
URL: http://svn.apache.org/viewvc?rev=1338348&view=rev
Log:
svn merge -c 1337430 from branch-2 for HADOOP-8366 Use ProtoBuf for
RpcResponseHeader
Removed:
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Status.java
Modified:
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/
(props changed)
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/CHANGES.txt
(contents, props changed)
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/java/
(props changed)
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto
Propchange:
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common:r1337283
Merged
/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common:r1337430
Modified:
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1338348&r1=1338347&r2=1338348&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/CHANGES.txt
(original)
+++
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/CHANGES.txt
Mon May 14 19:07:57 2012
@@ -172,6 +172,8 @@ Release 2.0.0 - UNRELEASED
HADOOP-8285 Use ProtoBuf for RpcPayLoadHeader (sanjay radia)
+ HADOOP-8366 Use ProtoBuf for RpcResponseHeader (sanjay radia)
+
OPTIMIZATIONS
BUG FIXES
Propchange:
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
Merged
/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt:r1337430
Merged
/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1337283
Propchange:
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
Merged
/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1337283
Merged
/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java:r1337430
Modified:
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1338348&r1=1338347&r2=1338348&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
(original)
+++
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
Mon May 14 19:07:57 2012
@@ -53,6 +53,8 @@ import org.apache.hadoop.fs.CommonConfig
import
org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import
org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto;
import
org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto;
+import
org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@@ -845,24 +847,24 @@ public class Client {
touch();
try {
- int id = in.readInt(); // try to read an id
-
+ RpcResponseHeaderProto response =
+ RpcResponseHeaderProto.parseDelimitedFrom(in);
+ int callId = response.getCallId();
if (LOG.isDebugEnabled())
- LOG.debug(getName() + " got value #" + id);
-
- Call call = calls.get(id);
+ LOG.debug(getName() + " got value #" + callId);
- int state = in.readInt(); // read call status
- if (state == Status.SUCCESS.state) {
+ Call call = calls.get(callId);
+ RpcStatusProto status = response.getStatus();
+ if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
call.setRpcResponse(value);
- calls.remove(id);
- } else if (state == Status.ERROR.state) {
+ calls.remove(callId);
+ } else if (status == RpcStatusProto.ERROR) {
call.setException(new RemoteException(WritableUtils.readString(in),
WritableUtils.readString(in)));
- calls.remove(id);
- } else if (state == Status.FATAL.state) {
+ calls.remove(callId);
+ } else if (status == RpcStatusProto.FATAL) {
// Close the connection
markClosed(new RemoteException(WritableUtils.readString(in),
WritableUtils.readString(in)));
Modified:
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1338348&r1=1338347&r2=1338348&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
(original)
+++
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
Mon May 14 19:07:57 2012
@@ -1339,7 +1339,7 @@ public abstract class Server {
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
+ ") is configured as simple. Please configure another method "
+ "like kerberos or digest.");
- setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
+ setupResponse(authFailedResponse, authFailedCall,
RpcStatusProto.FATAL,
null, ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);
throw ae;
@@ -1420,7 +1420,7 @@ public abstract class Server {
Call fakeCall = new Call(-1, null, this);
// Versions 3 and greater can interpret this exception
// response in the same manner
- setupResponse(buffer, fakeCall, Status.FATAL,
+ setupResponseOldVersionFatal(buffer, fakeCall,
null, VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall);
@@ -1443,7 +1443,7 @@ public abstract class Server {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
Call fakeCall = new Call(-1, null, this);
- setupResponse(buffer, fakeCall, Status.FATAL, null,
+ setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, null,
IpcException.class.getName(), errMsg);
responder.doRespond(fakeCall);
}
@@ -1579,7 +1579,7 @@ public abstract class Server {
new Call(header.getCallId(), null, this);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+ setupResponse(responseBuffer, readParamsFailedCall,
RpcStatusProto.FATAL, null,
IOException.class.getName(),
"Unknown rpc kind " + header.getRpcKind());
responder.doRespond(readParamsFailedCall);
@@ -1597,7 +1597,7 @@ public abstract class Server {
new Call(header.getCallId(), null, this);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+ setupResponse(responseBuffer, readParamsFailedCall,
RpcStatusProto.FATAL, null,
t.getClass().getName(),
"IPC server unable to read call parameters: " + t.getMessage());
responder.doRespond(readParamsFailedCall);
@@ -1627,7 +1627,7 @@ public abstract class Server {
rpcMetrics.incrAuthorizationSuccesses();
} catch (AuthorizationException ae) {
rpcMetrics.incrAuthorizationFailures();
- setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
+ setupResponse(authFailedResponse, authFailedCall,
RpcStatusProto.FATAL, null,
ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);
return false;
@@ -1725,8 +1725,8 @@ public abstract class Server {
// responder.doResponse() since setupResponse may use
// SASL to encrypt response data and SASL enforces
// its own message ordering.
- setupResponse(buf, call, (error == null) ? Status.SUCCESS
- : Status.ERROR, value, errorClass, error);
+ setupResponse(buf, call, (error == null) ? RpcStatusProto.SUCCESS
+ : RpcStatusProto.ERROR, value, errorClass, error);
// Discard the large buf and reset it back to smaller size
// to free up heap
@@ -1859,41 +1859,80 @@ public abstract class Server {
/**
* Setup response for the IPC Call.
*
- * @param response buffer to serialize the response into
+ * @param responseBuf buffer to serialize the response into
* @param call {@link Call} to which we are setting up the response
- * @param status {@link Status} of the IPC call
+ * @param status of the IPC call
* @param rv return value for the IPC Call, if the call was successful
* @param errorClass error class, if the the call failed
* @param error error message, if the call failed
* @throws IOException
*/
- private void setupResponse(ByteArrayOutputStream response,
- Call call, Status status,
+ private void setupResponse(ByteArrayOutputStream responseBuf,
+ Call call, RpcStatusProto status,
Writable rv, String errorClass, String error)
throws IOException {
- response.reset();
- DataOutputStream out = new DataOutputStream(response);
- out.writeInt(call.callId); // write call id
- out.writeInt(status.state); // write status
+ responseBuf.reset();
+ DataOutputStream out = new DataOutputStream(responseBuf);
+ RpcResponseHeaderProto.Builder response =
+ RpcResponseHeaderProto.newBuilder();
+ response.setCallId(call.callId);
+ response.setStatus(status);
- if (status == Status.SUCCESS) {
+
+ if (status == RpcStatusProto.SUCCESS) {
try {
+ response.build().writeDelimitedTo(out);
rv.write(out);
} catch (Throwable t) {
LOG.warn("Error serializing call response for call " + call, t);
// Call back to same function - this is OK since the
// buffer is reset at the top, and since status is changed
// to ERROR it won't infinite loop.
- setupResponse(response, call, Status.ERROR,
+ setupResponse(responseBuf, call, RpcStatusProto.ERROR,
null, t.getClass().getName(),
StringUtils.stringifyException(t));
return;
}
} else {
+ if (status == RpcStatusProto.FATAL) {
+ response.setServerIpcVersionNum(Server.CURRENT_VERSION);
+ }
+ response.build().writeDelimitedTo(out);
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
if (call.connection.useWrap) {
+ wrapWithSasl(responseBuf, call);
+ }
+ call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));
+ }
+
+ /**
+ * Setup response for the IPC Call on Fatal Error from a
+ * client that is using old version of Hadoop.
+ * The response is serialized using the previous protocol's response
+ * layout.
+ *
+ * @param response buffer to serialize the response into
+ * @param call {@link Call} to which we are setting up the response
+ * @param rv return value for the IPC Call, if the call was successful
+ * @param errorClass error class, if the the call failed
+ * @param error error message, if the call failed
+ * @throws IOException
+ */
+ private void setupResponseOldVersionFatal(ByteArrayOutputStream response,
+ Call call,
+ Writable rv, String errorClass, String error)
+ throws IOException {
+ final int OLD_VERSION_FATAL_STATUS = -1;
+ response.reset();
+ DataOutputStream out = new DataOutputStream(response);
+ out.writeInt(call.callId); // write call id
+ out.writeInt(OLD_VERSION_FATAL_STATUS); // write FATAL_STATUS
+ WritableUtils.writeString(out, errorClass);
+ WritableUtils.writeString(out, error);
+
+ if (call.connection.useWrap) {
wrapWithSasl(response, call);
}
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
Modified:
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto?rev=1338348&r1=1338347&r2=1338348&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto
(original)
+++
hadoop/common/branches/branch-2.0.0-alpha/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto
Mon May 14 19:07:57 2012
@@ -19,7 +19,6 @@ option java_package = "org.apache.hadoop
option java_outer_classname = "RpcPayloadHeaderProtos";
option java_generate_equals_and_hash = true;
-
/**
* This is the rpc payload header. It is sent with every rpc call.
*
@@ -34,8 +33,6 @@ option java_generate_equals_and_hash = t
*
*/
-
-
/**
* RpcKind determine the rpcEngine and the serialization of the rpc payload
*/
@@ -54,5 +51,27 @@ enum RpcPayloadOperationProto {
message RpcPayloadHeaderProto { // the header for the RpcRequest
optional RpcKindProto rpcKind = 1;
optional RpcPayloadOperationProto rpcOp = 2;
- optional uint32 callId = 3; // each rpc has a callId that is also used in
response
+ required uint32 callId = 3; // each rpc has a callId that is also used in
response
+}
+
+enum RpcStatusProto {
+ SUCCESS = 0; // RPC succeeded
+ ERROR = 1; // RPC Failed
+ FATAL = 2; // Fatal error - connection is closed
+}
+
+/**
+ * Rpc Response Header
+ * - If successfull then the Respose follows after this header
+ * - length (4 byte int), followed by the response
+ * - If error or fatal - the exception info follow
+ * - length (4 byte int) Class name of exception - UTF-8 string
+ * - length (4 byte int) Stacktrace - UTF-8 string
+ * - if the strings are null then the length is -1
+ * In case of Fatal error then the respose contains the Serverside's IPC
version
+ */
+message RpcResponseHeaderProto {
+ required uint32 callId = 1; // callId used in Request
+ required RpcStatusProto status = 2;
+ optional uint32 serverIpcVersionNum = 3; // in case of an fatal IPC error
}