Author: szetszwo
Date: Sat May 12 01:49:52 2012
New Revision: 1337430

URL: http://svn.apache.org/viewvc?rev=1337430&view=rev
Log:
svn merge -c 1337283 from trunk for HADOOP-8366 Use ProtoBuf for 
RpcResponseHeader.

Removed:
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Status.java
Modified:
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/   
(props changed)
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt 
  (contents, props changed)
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/
   (props changed)
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto

Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common:r1337283

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1337430&r1=1337429&r2=1337430&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt 
(original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt 
Sat May 12 01:49:52 2012
@@ -184,6 +184,8 @@ Release 2.0.0 - UNRELEASED
 
     HADOOP-8285 Use ProtoBuf for RpcPayLoadHeader (sanjay radia)
 
+    HADOOP-8366 Use ProtoBuf for RpcResponseHeader (sanjay radia)
+
   OPTIMIZATIONS
 
   BUG FIXES

Propchange: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
  Merged 
/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1337283

Propchange: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
  Merged 
/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1337283

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1337430&r1=1337429&r2=1337430&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 Sat May 12 01:49:52 2012
@@ -53,6 +53,8 @@ import org.apache.hadoop.fs.CommonConfig
 import 
org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
 import 
org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto;
 import 
org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto;
+import 
org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -845,24 +847,24 @@ public class Client {
       touch();
       
       try {
-        int id = in.readInt();                    // try to read an id
-
+        RpcResponseHeaderProto response = 
+            RpcResponseHeaderProto.parseDelimitedFrom(in);
+        int callId = response.getCallId();
         if (LOG.isDebugEnabled())
-          LOG.debug(getName() + " got value #" + id);
-
-        Call call = calls.get(id);
+          LOG.debug(getName() + " got value #" + callId);
 
-        int state = in.readInt();     // read call status
-        if (state == Status.SUCCESS.state) {
+        Call call = calls.get(callId);
+        RpcStatusProto status = response.getStatus();
+        if (status == RpcStatusProto.SUCCESS) {
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
           value.readFields(in);                 // read value
           call.setRpcResponse(value);
-          calls.remove(id);
-        } else if (state == Status.ERROR.state) {
+          calls.remove(callId);
+        } else if (status == RpcStatusProto.ERROR) {
           call.setException(new RemoteException(WritableUtils.readString(in),
                                                 WritableUtils.readString(in)));
-          calls.remove(id);
-        } else if (state == Status.FATAL.state) {
+          calls.remove(callId);
+        } else if (status == RpcStatusProto.FATAL) {
           // Close the connection
           markClosed(new RemoteException(WritableUtils.readString(in), 
                                          WritableUtils.readString(in)));

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1337430&r1=1337429&r2=1337430&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
 Sat May 12 01:49:52 2012
@@ -1339,7 +1339,7 @@ public abstract class Server {
               + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
               + ") is configured as simple. Please configure another method "
               + "like kerberos or digest.");
-            setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
+            setupResponse(authFailedResponse, authFailedCall, 
RpcStatusProto.FATAL,
                 null, ae.getClass().getName(), ae.getMessage());
             responder.doRespond(authFailedCall);
             throw ae;
@@ -1420,7 +1420,7 @@ public abstract class Server {
         Call fakeCall =  new Call(-1, null, this);
         // Versions 3 and greater can interpret this exception
         // response in the same manner
-        setupResponse(buffer, fakeCall, Status.FATAL,
+        setupResponseOldVersionFatal(buffer, fakeCall,
             null, VersionMismatch.class.getName(), errMsg);
 
         responder.doRespond(fakeCall);
@@ -1443,7 +1443,7 @@ public abstract class Server {
       ByteArrayOutputStream buffer = new ByteArrayOutputStream();
 
       Call fakeCall = new Call(-1, null, this);
-      setupResponse(buffer, fakeCall, Status.FATAL, null,
+      setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, null,
           IpcException.class.getName(), errMsg);
       responder.doRespond(fakeCall);
     }
@@ -1579,7 +1579,7 @@ public abstract class Server {
             new Call(header.getCallId(), null, this);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
 
-        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+        setupResponse(responseBuffer, readParamsFailedCall, 
RpcStatusProto.FATAL, null,
             IOException.class.getName(),
             "Unknown rpc kind "  + header.getRpcKind());
         responder.doRespond(readParamsFailedCall);
@@ -1597,7 +1597,7 @@ public abstract class Server {
             new Call(header.getCallId(), null, this);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
 
-        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+        setupResponse(responseBuffer, readParamsFailedCall, 
RpcStatusProto.FATAL, null,
             t.getClass().getName(),
             "IPC server unable to read call parameters: " + t.getMessage());
         responder.doRespond(readParamsFailedCall);
@@ -1627,7 +1627,7 @@ public abstract class Server {
         rpcMetrics.incrAuthorizationSuccesses();
       } catch (AuthorizationException ae) {
         rpcMetrics.incrAuthorizationFailures();
-        setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
+        setupResponse(authFailedResponse, authFailedCall, 
RpcStatusProto.FATAL, null,
             ae.getClass().getName(), ae.getMessage());
         responder.doRespond(authFailedCall);
         return false;
@@ -1725,8 +1725,8 @@ public abstract class Server {
             // responder.doResponse() since setupResponse may use
             // SASL to encrypt response data and SASL enforces
             // its own message ordering.
-            setupResponse(buf, call, (error == null) ? Status.SUCCESS
-                : Status.ERROR, value, errorClass, error);
+            setupResponse(buf, call, (error == null) ? RpcStatusProto.SUCCESS
+                : RpcStatusProto.ERROR, value, errorClass, error);
             
             // Discard the large buf and reset it back to smaller size 
             // to free up heap
@@ -1859,41 +1859,80 @@ public abstract class Server {
   /**
    * Setup response for the IPC Call.
    * 
-   * @param response buffer to serialize the response into
+   * @param responseBuf buffer to serialize the response into
    * @param call {@link Call} to which we are setting up the response
-   * @param status {@link Status} of the IPC call
+   * @param status of the IPC call
    * @param rv return value for the IPC Call, if the call was successful
    * @param errorClass error class, if the the call failed
    * @param error error message, if the call failed
    * @throws IOException
    */
-  private void setupResponse(ByteArrayOutputStream response, 
-                             Call call, Status status, 
+  private void setupResponse(ByteArrayOutputStream responseBuf,
+                             Call call, RpcStatusProto status, 
                              Writable rv, String errorClass, String error) 
   throws IOException {
-    response.reset();
-    DataOutputStream out = new DataOutputStream(response);
-    out.writeInt(call.callId);                // write call id
-    out.writeInt(status.state);           // write status
+    responseBuf.reset();
+    DataOutputStream out = new DataOutputStream(responseBuf);
+    RpcResponseHeaderProto.Builder response =  
+        RpcResponseHeaderProto.newBuilder();
+    response.setCallId(call.callId);
+    response.setStatus(status);
 
-    if (status == Status.SUCCESS) {
+
+    if (status == RpcStatusProto.SUCCESS) {
       try {
+        response.build().writeDelimitedTo(out);
         rv.write(out);
       } catch (Throwable t) {
         LOG.warn("Error serializing call response for call " + call, t);
         // Call back to same function - this is OK since the
         // buffer is reset at the top, and since status is changed
         // to ERROR it won't infinite loop.
-        setupResponse(response, call, Status.ERROR,
+        setupResponse(responseBuf, call, RpcStatusProto.ERROR,
             null, t.getClass().getName(),
             StringUtils.stringifyException(t));
         return;
       }
     } else {
+      if (status == RpcStatusProto.FATAL) {
+        response.setServerIpcVersionNum(Server.CURRENT_VERSION);
+      }
+      response.build().writeDelimitedTo(out);
       WritableUtils.writeString(out, errorClass);
       WritableUtils.writeString(out, error);
     }
     if (call.connection.useWrap) {
+      wrapWithSasl(responseBuf, call);
+    }
+    call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));
+  }
+  
+  /**
+   * Setup response for the IPC Call on Fatal Error from a 
+   * client that is using old version of Hadoop.
+   * The response is serialized using the previous protocol's response
+   * layout.
+   * 
+   * @param response buffer to serialize the response into
+   * @param call {@link Call} to which we are setting up the response
+   * @param rv return value for the IPC Call, if the call was successful
+   * @param errorClass error class, if the the call failed
+   * @param error error message, if the call failed
+   * @throws IOException
+   */
+  private void setupResponseOldVersionFatal(ByteArrayOutputStream response, 
+                             Call call,
+                             Writable rv, String errorClass, String error) 
+  throws IOException {
+    final int OLD_VERSION_FATAL_STATUS = -1;
+    response.reset();
+    DataOutputStream out = new DataOutputStream(response);
+    out.writeInt(call.callId);                // write call id
+    out.writeInt(OLD_VERSION_FATAL_STATUS);   // write FATAL_STATUS
+    WritableUtils.writeString(out, errorClass);
+    WritableUtils.writeString(out, error);
+
+    if (call.connection.useWrap) {
       wrapWithSasl(response, call);
     }
     call.setResponse(ByteBuffer.wrap(response.toByteArray()));

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto?rev=1337430&r1=1337429&r2=1337430&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto
 Sat May 12 01:49:52 2012
@@ -19,7 +19,6 @@ option java_package = "org.apache.hadoop
 option java_outer_classname = "RpcPayloadHeaderProtos";
 option java_generate_equals_and_hash = true;
 
-
 /**
  * This is the rpc payload header. It is sent with every rpc call.
  * 
@@ -34,8 +33,6 @@ option java_generate_equals_and_hash = t
  *
  */
 
-
-
 /**
  * RpcKind determine the rpcEngine and the serialization of the rpc payload
  */
@@ -54,5 +51,27 @@ enum RpcPayloadOperationProto {
 message RpcPayloadHeaderProto { // the header for the RpcRequest
   optional RpcKindProto rpcKind = 1;
   optional RpcPayloadOperationProto rpcOp = 2;
-  optional uint32 callId = 3; // each rpc has a callId that is also used in 
response
+  required uint32 callId = 3; // each rpc has a callId that is also used in 
response
+}
+
+enum RpcStatusProto {
+ SUCCESS = 0;  // RPC succeeded
+ ERROR = 1;    // RPC Failed
+ FATAL = 2;    // Fatal error - connection is closed
+}
+
+/**
+ * Rpc Response Header
+ *    - If successfull then the Respose follows after this header
+ *        - length (4 byte int), followed by the response
+ *    - If error or fatal - the exception info follow
+ *        - length (4 byte int) Class name of exception - UTF-8 string
+ *        - length (4 byte int) Stacktrace - UTF-8 string
+ *        - if the strings are null then the length is -1
+ * In case of Fatal error then the respose contains the Serverside's IPC 
version
+ */
+message RpcResponseHeaderProto {
+  required uint32 callId = 1; // callId used in Request
+  required RpcStatusProto status = 2;
+  optional uint32 serverIpcVersionNum = 3; // in case of an fatal IPC error 
 }


Reply via email to