Author: cmccabe
Date: Thu May 15 20:08:36 2014
New Revision: 1595028

URL: http://svn.apache.org/r1595028
Log:
HADOOP-10564. Add username to native RPCv9 client (cmccabe)

Added:
    hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c
    hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h
Modified:
    hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt
    
hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c
    hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h
    hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c
    hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h
    hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c
    hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h
    hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c

Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt 
(original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt Thu 
May 15 20:08:36 2014
@@ -55,6 +55,7 @@ set(COMMON_SRCS
     common/net.c
     common/string.c
     common/test.c
+    common/user.c
 )
 set(COMMON_DEPS
     pthread

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c?rev=1595028&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c Thu 
May 15 20:08:36 2014
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/user.h"
+
+#include <errno.h>
+#include <pwd.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#define GETPWUID_R_BUFLEN_MIN 4096
+#define GETPWUID_R_BUFLEN_MAX (8 * 1024 * 1024)
+
+/**
+ * Get the user name associated with a given numeric user ID.
+ *
+ * The POSIX APIs to do this require a little more code than you might expect.
+ * We have to supply a buffer to the getpwuid_r call.  But how big should it
+ * be?  We can call sysconf to get a "suggested size," but this may or may not
+ * be big enough for our particular user.  Also, on some platforms, sysconf
+ * returns -1 for this.  So we have a loop which keeps doubling the size of the
+ * buffer.  As a sanity check against buggy libraries, we give up when we reach
+ * some ridiculous buffer size.
+ *
+ * @param uid           UID to look up
+ * @param out           (out param) on success, the user name.
+ *
+ * @return              0 on success; error code on failure.
+ */
+static int uid_to_string(uid_t uid, char **out)
+{
+    int ret;
+    struct passwd pwd, *result;
+    char *buf = NULL, *nbuf;
+    size_t buflen, nbuflen;
+
+    buflen = sysconf(_SC_GETPW_R_SIZE_MAX);
+    if (buflen < GETPWUID_R_BUFLEN_MIN) {
+        buflen = GETPWUID_R_BUFLEN_MIN;
+    }
+    while (1) {
+        nbuf = realloc(buf, buflen);
+        if (!nbuf) {
+            ret = ENOMEM;
+            goto done;
+        }
+        buf = nbuf;
+        ret = getpwuid_r(uid, &pwd, buf, buflen, &result);
+        if (ret == 0)
+            break;
+        if (ret != ERANGE) {
+            fprintf(stderr, "geteuid_string: getpwuid_r(%lld) failed "
+                    "with error %d\n", (long long)uid, ret);
+            goto done;
+        }
+        nbuflen = buflen *2;
+        if (nbuflen > GETPWUID_R_BUFLEN_MAX) {
+            nbuflen = GETPWUID_R_BUFLEN_MAX;
+        }
+        if (buflen == nbuflen) {
+            fprintf(stderr, "geteuid_string: getpwuid_r(%lld) still gets "
+                    "ERANGE with buflen %zd\n", (long long)uid, buflen);
+            ret = ERANGE;
+            goto done;
+        }
+        buflen = nbuflen;
+    }
+    if (!result) {
+        ret = ENOENT;
+        fprintf(stderr, "geteuid_string: getpwuid_r(%lld): no name "
+                "found for current effective user id.\n", (long long)uid);
+        goto done;
+    }
+    *out = strdup(result->pw_name);
+    if (!*out) {
+        ret = ENOMEM;
+        goto done;
+    }
+
+done:
+    free(buf);
+    return ret;
+}
+
+int geteuid_string(char **out)
+{
+    return uid_to_string(geteuid(), out);
+}
+
+// vim: ts=4:sw=4:tw=79:et

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h?rev=1595028&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h Thu 
May 15 20:08:36 2014
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HADOOP_CORE_COMMON_USER_H
+#define HADOOP_CORE_COMMON_USER_H
+
+/**
+ * Get the current effective user name.
+ *
+ * @param out           (out param) The name of the current user.
+ *                      A malloc'ed string which must be freed.
+ *
+ * @return              0 on success; error code otherwise.
+ */
+int geteuid_string(char **out);
+
+#endif
+
+// vim: ts=4:sw=4:tw=79:et

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c 
(original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c 
Thu May 15 20:08:36 2014
@@ -18,6 +18,7 @@
 
 #include "common/hadoop_err.h"
 #include "common/test.h"
+#include "common/user.h"
 #include "protobuf/ClientNamenodeProtocol.call.h"
 #include "rpc/messenger.h"
 #include "rpc/proxy.h"
@@ -31,12 +32,14 @@
 
 struct options {
     struct sockaddr_in remote;
+    char *username;
 };
 
 static void options_from_env(struct options *opts)
 {
     const char *ip_str;
     const char *port_str;
+    const char *username;
     int res, port;
 
     ip_str = getenv("HDFS_IP");
@@ -58,6 +61,19 @@ static void options_from_env(struct opti
                 ip_str, port, uv_strerror(res));
         exit(EXIT_FAILURE);
     }
+    username = getenv("HDFS_USERNAME");
+    if (username) {
+        opts->username = strdup(username);
+        if (!opts->username)
+            abort();
+        fprintf(stderr, "using HDFS username %s\n", username);
+    } else {
+        res = geteuid_string(&opts->username);
+        if (res) {
+            fprintf(stderr, "geteuid_string failed with error %d\n", res);
+            abort();
+        }
+    }
 }
 
 void set_replication_cb(SetReplicationResponseProto *resp,
@@ -87,9 +103,8 @@ void set_replication_cb(SetReplicationRe
 int main(void)
 {
     struct hrpc_messenger_builder *msgr_bld;
-    struct hrpc_proxy_builder *proxy_bld;
-    struct hrpc_proxy *proxy;
     struct hrpc_messenger *msgr;
+    struct hrpc_proxy proxy;
     struct options opts;
     uv_sem_t sem;
 
@@ -98,26 +113,24 @@ int main(void)
     msgr_bld = hrpc_messenger_builder_alloc();
     EXPECT_NONNULL(msgr_bld);
     EXPECT_NO_HADOOP_ERR(hrpc_messenger_create(msgr_bld, &msgr));
-    proxy_bld = hrpc_proxy_builder_alloc(msgr);
-    EXPECT_NONNULL(proxy_bld);
-    hrpc_proxy_builder_set_remote(proxy_bld, &opts.remote);
-    hrpc_proxy_builder_set_protocol(proxy_bld,
-                "org.apache.hadoop.hdfs.protocol.ClientProtocol");
-    EXPECT_NO_HADOOP_ERR(hrpc_proxy_create(proxy_bld, &proxy));
+
+    hrpc_proxy_init(&proxy, msgr, &opts.remote,
+            "org.apache.hadoop.hdfs.protocol.ClientProtocol",
+            opts.username);
     EXPECT_INT_ZERO(uv_sem_init(&sem, 0));
     {
         SetReplicationRequestProto req = SET_REPLICATION_REQUEST_PROTO__INIT;
         req.src = "/foo2";
         req.replication = 2;
-        cnn_async_set_replication(proxy, &req, set_replication_cb, &sem);
+        cnn_async_set_replication(&proxy, &req, set_replication_cb, &sem);
     }
     uv_sem_wait(&sem);
 
-    hrpc_proxy_free(proxy);
     hrpc_messenger_shutdown(msgr);
     hrpc_messenger_free(msgr);
     uv_sem_destroy(&sem);
 
+    free(opts.username);
     return EXIT_SUCCESS;
 }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h Thu May 
15 20:08:36 2014
@@ -20,12 +20,14 @@
 #define HADOOP_CORE_RPC_CALL_H
 
 #include "common/queue.h"
-#include "rpc/proxy.h" // for hrpc_raw_cb_t 
 
 #include <netinet/in.h> // for struct sockaddr_in
 #include <stdint.h> // for int32_t
 #include <uv.h> // for uv_buf_t
 
+struct hadoop_err;
+struct hrpc_response;
+
 /**
  * The Hadoop call.
  *
@@ -33,7 +35,8 @@
  * include.
  */
 
-struct hrpc_call;
+typedef void (*hrpc_raw_cb_t)(struct hrpc_response *,
+    struct hadoop_err *, void *);
 
 /**
  * A Hadoop RPC call.
@@ -61,14 +64,19 @@ struct hrpc_call {
     void *cb_data;
 
     /**
-     * Malloc'ed payload to send.  Malloced.
+     * Malloc'ed payload to send.
      */
     uv_buf_t payload;
 
     /**
-     * The Java protocol we're using.  Malloced.
+     * String describing the protocol this call is using.
+     */
+    const char *protocol;
+
+    /**
+     * String describing the username this call is using. 
      */
-    char *protocol;
+    const char *username;
 
     /**
      * Nonzero if the call is currently active.  Must be set using atomic

Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c Thu May 
15 20:08:36 2014
@@ -24,10 +24,12 @@
 #include "rpc/call.h"
 #include "rpc/conn.h"
 #include "rpc/messenger.h"
+#include "rpc/proxy.h"
 #include "rpc/reactor.h"
 #include "rpc/varint.h"
 
 #include <errno.h>
+#include <inttypes.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -58,6 +60,8 @@ static const uint8_t FRAME[] = {
 
 #define FRAME_LEN sizeof(FRAME)
 
+#define CONNECTION_CONTEXT_CALL_ID (-3)
+
 static void conn_read_cb(uv_stream_t *stream, ssize_t nread,
                          const uv_buf_t* buf);
 static void hrpc_conn_read_alloc(uv_handle_t *handle,
@@ -98,6 +102,7 @@ static struct hadoop_err *conn_setup_ipc
     struct hrpc_conn_writer *writer = &conn->writer;
     IpcConnectionContextProto ipc_ctx = IPC_CONNECTION_CONTEXT_PROTO__INIT;
     RpcRequestHeaderProto rpc_req_header = RPC_REQUEST_HEADER_PROTO__INIT;
+    UserInformationProto userinfo = USER_INFORMATION_PROTO__INIT;
     int32_t cset_len, buf_len;
     int32_t ipc_ctx_len, rpc_req_header_len, off = 0;
     uint8_t *buf;
@@ -107,14 +112,15 @@ static struct hadoop_err *conn_setup_ipc
     rpc_req_header.rpckind = RPC_PROTOCOL_BUFFER;
     rpc_req_header.has_rpcop = 1;
     rpc_req_header.rpcop = RPC_FINAL_PACKET;
-    rpc_req_header.callid = -3; // ???
+    rpc_req_header.callid = CONNECTION_CONTEXT_CALL_ID;
     rpc_req_header.clientid.data = conn->client_id.buf;
     rpc_req_header.clientid.len = HRPC_CLIENT_ID_LEN;
     rpc_req_header.has_retrycount = 0;
     rpc_req_header_len =
         rpc_request_header_proto__get_packed_size(&rpc_req_header);
-
-    ipc_ctx.userinfo = NULL;
+    userinfo.effectiveuser = conn->username;
+    userinfo.realuser = NULL;
+    ipc_ctx.userinfo = &userinfo;
     ipc_ctx.protocol = conn->protocol;
     ipc_ctx_len = ipc_connection_context_proto__get_packed_size(&ipc_ctx);
 
@@ -156,7 +162,7 @@ static struct hadoop_err *hrpc_conn_setu
     rpc_req_header.rpckind = RPC_PROTOCOL_BUFFER;
     rpc_req_header.has_rpcop = 1;
     rpc_req_header.rpcop = RPC_FINAL_PACKET;
-    rpc_req_header.callid = 0; // ???
+    rpc_req_header.callid = conn->call_id;
     rpc_req_header.clientid.data = conn->client_id.buf;
     rpc_req_header.clientid.len = HRPC_CLIENT_ID_LEN;
     rpc_req_header.has_retrycount = 0;
@@ -168,7 +174,7 @@ static struct hadoop_err *hrpc_conn_setu
     total_len64 += call->payload.len;
     if (total_len64 > MAX_READER_BODY_LEN) {
         err = hadoop_lerr_alloc(EINVAL, "hrpc_conn_setup_payload: "
-                "can't send a payload of length %"PRId64".  The maximum "
+                "can't send a payload of length %" PRId64 ".  The maximum "
                 "payload length is %d", total_len64, MAX_READER_BODY_LEN);
         return err;
     }
@@ -255,6 +261,12 @@ static void conn_start_outbound(struct h
     struct hrpc_conn_writer *writer = &conn->writer;
     int res;
 
+    // Get next call ID to use.
+    if (conn->call_id >= 0x7fffffff) {
+        conn->call_id = 1;
+    } else {
+        conn->call_id++;
+    }
     writer->state = HRPC_CONN_WRITE_IPC_HEADER;
     err = conn_setup_ipc_header(conn);
     if (err) {
@@ -317,7 +329,8 @@ struct hadoop_err *hrpc_conn_create_outb
     conn->call = call;
     conn->remote = call->remote;
     conn->protocol = strdup(call->protocol);
-    if (!conn->protocol) {
+    conn->username = strdup(call->username);
+    if ((!conn->protocol) || (!conn->username)) {
         err = hadoop_lerr_alloc(ENOMEM, "hrpc_conn_create_outbound: OOM");
         goto done;
     }
@@ -344,6 +357,7 @@ done:
     if (err) {
         if (conn) {
             free(conn->protocol);
+            free(conn->username);
             if (tcp_init) {
                 uv_close((uv_handle_t*)&conn->stream, NULL);
             }
@@ -357,7 +371,7 @@ done:
 
 int hrpc_conn_compare(const struct hrpc_conn *a, const struct hrpc_conn *b)
 {
-    int proto_cmp, a_active, b_active;
+    int proto_cmp, username_cmp, a_active, b_active;
 
     // Big-endian versus little-endian doesn't matter here.
     // We just want a consistent ordering on the same machine.
@@ -373,6 +387,10 @@ int hrpc_conn_compare(const struct hrpc_
     proto_cmp = strcmp(a->protocol, b->protocol);
     if (proto_cmp != 0)
         return proto_cmp;
+    // Compare username.
+    username_cmp = strcmp(a->username, b->username);
+    if (username_cmp != 0)
+        return username_cmp;
     // Make the inactive connections sort before the active ones.
     a_active = !!a->call;
     b_active = !!b->call;
@@ -390,7 +408,8 @@ int hrpc_conn_compare(const struct hrpc_
 }
 
 int hrpc_conn_usable(const struct hrpc_conn *conn,
-                      const struct sockaddr_in *addr, const char *protocol)
+            const struct sockaddr_in *addr,
+            const char *protocol, const char *username)
 {
     if (conn->remote.sin_addr.s_addr != addr->sin_addr.s_addr)
         return 0;
@@ -398,6 +417,8 @@ int hrpc_conn_usable(const struct hrpc_c
         return 0;
     else if (strcmp(conn->protocol, protocol))
         return 0;
+    else if (strcmp(conn->username, username))
+        return 0;
     return 1;
 }
 
@@ -420,14 +441,14 @@ static struct hadoop_err *conn_deliver_r
     // Check if the server sent us a bogus payload_len value.
     if (payload_len < 0) {
         return hadoop_lerr_alloc(EIO, "conn_deliver_resp: "
-            "server's payload_len was %"PRId32", but negative payload "
+            "server's payload_len was %" PRId32 ", but negative payload "
             "lengths are not valid.", payload_len);
     }
     payload_end = off;
     payload_end += payload_len;
     if (payload_end > reader->body_len) {
         return hadoop_lerr_alloc(EIO, "conn_deliver_resp: "
-            "server's payload_len was %"PRId64", but there are only %d "
+            "server's payload_len was %" PRId64 ", but there are only %d "
             "bytes left in the body buffer.", payload_end, reader->body_len);
     }
     // Reset the connection's read state.  We'll hold on to the response buffer
@@ -461,13 +482,13 @@ static struct hadoop_err *conn_process_r
     }
     if (resp_header_len <= 0) {
         err = hadoop_lerr_alloc(EIO, "conn_process_response: server sent "
-                "invalid resp_header_len of %"PRId32, resp_header_len);
+                "invalid resp_header_len of %" PRId32, resp_header_len);
         goto done;
     }
     rem = reader->body_len - off;
     if (resp_header_len > rem) {
         err = hadoop_lerr_alloc(EIO, "conn_process_response: server sent "
-                "resp_header_len of %"PRId32", but there were only %"PRId32
+                "resp_header_len of %" PRId32 ", but there were only %" PRId32
                 " bytes left in the response.", resp_header_len, rem);
         goto done;
     }
@@ -479,6 +500,14 @@ static struct hadoop_err *conn_process_r
         goto done;
     }
     off += resp_header_len;
+    if (resp_header->callid != conn->call_id) {
+        // We currently only send one request at a time.  So when we get a
+        // response, we expect it to be for the request we just sent.
+        err = hadoop_lerr_alloc(EIO, "conn_process_response: incorrect call "
+                "id in response.  Expected %" PRId32 ", got %" PRId32 ".",
+                conn->call_id, resp_header->callid);
+        goto done;
+    }
     if (resp_header->status != RPC_STATUS_PROTO__SUCCESS) {
         // TODO: keep connection open if we got an ERROR rather than a FATAL.
         err = hadoop_lerr_alloc(EIO, "conn_process_response: error %s: %s",
@@ -546,8 +575,8 @@ static void hrpc_conn_read_alloc(uv_hand
         rem = reader->body_len - reader->off;
         if (rem <= 0) {
             conn_log_warn(conn, "hrpc_conn_read_alloc: we're in state "
-                    "HRPC_CONN_READ_BODY with reader->body_len = %"PRId32", 
but "
-                    "reader->off = %"PRId32"\n", reader->body_len, 
reader->off);
+                    "HRPC_CONN_READ_BODY with reader->body_len = %" PRId32 ", 
but "
+                    "reader->off = %" PRId32 "\n", reader->body_len, 
reader->off);
             buf->base = NULL;
             buf->len = 0;
             return;
@@ -589,7 +618,7 @@ static void conn_read_cb(uv_stream_t *st
         reader->off += nread;
         if (reader->off < READLEN_BUF_LEN) {
             conn_log_debug(conn, "conn_read_cb: got partial read of "
-                           "body_len.  reader->off = %"PRId32"\n",
+                           "body_len.  reader->off = %" PRId32 "\n",
                            reader->off);
             return;
         }
@@ -597,12 +626,12 @@ static void conn_read_cb(uv_stream_t *st
         if ((reader->body_len <= 0) ||
                 (reader->body_len > MAX_READER_BODY_LEN)) {
             hrpc_conn_destroy(conn, hadoop_lerr_alloc(EIO, 
-                "conn_read_cb: got an invalid body length of %"PRId32"\n", 
+                "conn_read_cb: got an invalid body length of %" PRId32 "\n", 
                 reader->body_len));
             return;
         }
         conn_log_debug(conn, "conn_read_cb: got body length of "
-                       "%"PRId32".  Transitioning to HRPC_CONN_READ_BODY.\n",
+                       "%" PRId32 ".  Transitioning to HRPC_CONN_READ_BODY.\n",
                        reader->body_len);
         reader->off = 0;
         reader->state = HRPC_CONN_READ_BODY;
@@ -610,15 +639,15 @@ static void conn_read_cb(uv_stream_t *st
         if (!reader->body) {
             hrpc_conn_destroy(conn, hadoop_lerr_alloc(ENOMEM,
                     "hrpc_conn_read_alloc: failed to allocate "
-                    "%"PRId32" bytes.\n", reader->body_len));
+                    "%" PRId32 " bytes.\n", reader->body_len));
         }
         break;
     case HRPC_CONN_READ_BODY:
         reader->off += nread;
         if (reader->off < reader->body_len) {
             conn_log_debug(conn, "conn_read_cb: got partial read of "
-                           "body.  reader->off = %"PRId32" out of %"PRId32"\n",
-                           reader->off, reader->body_len);
+                           "body.  reader->off = %" PRId32 " out of %"
+                           PRId32 "\n", reader->off, reader->body_len);
             return;
         }
         err = conn_process_response(conn);
@@ -663,6 +692,7 @@ void hrpc_conn_destroy(struct hrpc_conn 
     free_write_bufs(conn);
     conn->writer.state = HRPC_CONN_WRITE_CLOSED;
     free(conn->protocol);
+    free(conn->username);
     uv_close((uv_handle_t*)&conn->stream, conn_free);
 }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h Thu May 
15 20:08:36 2014
@@ -134,6 +134,11 @@ struct hrpc_conn {
     char *protocol;
 
     /**
+     * The Hadoop username we're connected as.  Malloc'ed.
+     */
+    char *username;
+
+    /**
      * The client ID we used when establishing the connection.
      */
     struct hrpc_client_id client_id;  
@@ -143,6 +148,11 @@ struct hrpc_conn {
      */
     uv_connect_t conn_req;
 
+    /**
+     * The current call ID.
+     */
+    uint32_t call_id;
+
     struct hrpc_conn_writer writer;
     struct hrpc_conn_reader reader;
 };
@@ -198,11 +208,13 @@ int hrpc_conn_compare(const struct hrpc_
  * @param conn          The connection.
  * @param addr          The address.
  * @param protocol      The protocol.
+ * @param username      The username.
  *
  * @return              1 if the connection is usable; 0 if not.
  */
 int hrpc_conn_usable(const struct hrpc_conn *conn,
-                      const struct sockaddr_in *addr, const char *protocol);
+                      const struct sockaddr_in *addr,
+                      const char *protocol, const char *username);
 
 /**
  * Destroy a connection.

Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c 
(original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c Thu May 
15 20:08:36 2014
@@ -24,6 +24,7 @@
 #include "rpc/varint.h"
 
 #include <errno.h>
+#include <inttypes.h>
 #include <netinet/in.h>
 #include <stdio.h>
 #include <stdlib.h>
@@ -43,131 +44,22 @@
  */
 #define MAX_SEND_LEN (63 * 1024 * 1024)
 
-struct hrpc_proxy {
-    /**
-     * The messenger that this proxy is associated with.
-     */
-    struct hrpc_messenger *msgr;
-
-    /**
-     * Dynamically allocated string describing the protocol this proxy speaks. 
-     */
-    char *protocol;
-
-    /**
-     * The current call.
-     */
-    struct hrpc_call call;
-
-    /**
-     * A memory area which can be used by the current call.
-     *
-     * This will be null if userdata_len is 0.
-     */
-    uint8_t *userdata;
-
-    /**
-     * Length of userdata.
-     */
-    size_t userdata_len;
-};
-
 struct hrpc_proxy_builder {
     struct hrpc_proxy *proxy;
 };
 
 static const char OOM_ERROR[] = "OOM";
 
-struct hrpc_proxy_builder *hrpc_proxy_builder_alloc(
-            struct hrpc_messenger *msgr)
-{
-    struct hrpc_proxy_builder *bld;
-
-    bld = calloc(1, sizeof(struct hrpc_proxy_builder));
-    if (!bld)
-        return NULL;
-    bld->proxy = calloc(1, sizeof(struct hrpc_proxy));
-    if (!bld->proxy) {
-        free(bld);
-        return NULL;
-    }
-    bld->proxy->msgr = msgr;
-    bld->proxy->call.remote.sin_addr.s_addr = INADDR_ANY;
-
-    return bld;
-}
-
-void hrpc_proxy_builder_free(struct hrpc_proxy_builder *bld)
-{
-    if (!bld)
-        return;
-    free(bld->proxy);
-    free(bld);
-}
-
-void hrpc_proxy_builder_set_protocol(struct hrpc_proxy_builder *bld,
-                                     const char *protocol)
-{
-    struct hrpc_proxy *proxy = bld->proxy;
-
-    if (proxy->protocol) {
-        if (proxy->protocol != OOM_ERROR) {
-            free(proxy->protocol);
-        }
-        proxy->protocol = NULL;
-    }
-    proxy->protocol = strdup(protocol);
-    if (!proxy->protocol) {
-        proxy->protocol = (char*)OOM_ERROR;
-    }
-}
-
-void hrpc_proxy_builder_set_remote(struct hrpc_proxy_builder *bld,
-                                     const struct sockaddr_in *remote)
-{
-    bld->proxy->call.remote = *remote;
-}
-
-struct hadoop_err *hrpc_proxy_create(struct hrpc_proxy_builder *bld,
-                            struct hrpc_proxy **out)
-{
-    struct hrpc_proxy *proxy;
-    
-    proxy = bld->proxy;
-    free(bld);
-    //fprintf(stderr, "proxy = %p, proxy->protocol = %s, proxy->call.cb = 
%p\n", proxy, proxy->protocol, proxy->call.cb);
-    if (proxy->call.remote.sin_addr.s_addr == INADDR_ANY) {
-        hrpc_proxy_free(proxy);
-        return hadoop_lerr_alloc(EINVAL, "hrpc_proxy_create: you must specify "
-                                 "a remote.");
-    }
-    if (!proxy->protocol) {
-        hrpc_proxy_free(proxy);
-        return hadoop_lerr_alloc(EINVAL, "hrpc_proxy_create: can't create "
-                                 "a proxy without a protocol argument.");
-    } else if (proxy->protocol == OOM_ERROR) {
-        // There was an OOM error during hrpc_proxy_builder_set_protocol.
-        hrpc_proxy_free(proxy);
-        return hadoop_lerr_alloc(ENOMEM, "hrpc_proxy_create: OOM error.");
-    }
-    *out = proxy;
-    return NULL;
-}
-
-void hrpc_proxy_free(struct hrpc_proxy *proxy)
-{
-    if (!proxy)
-        return;
-    if (hrpc_call_is_active(&proxy->call)) {
-        proxy_log_warn(proxy, "%s", "hrpc_proxy_free: attempt to free a proxy "
-                       "which is currently active!\n");
-        return;
-    }
-    if (proxy->protocol != OOM_ERROR) {
-        free(proxy->protocol);
-    }
-    free(proxy->userdata);
-    free(proxy);
+void hrpc_proxy_init(struct hrpc_proxy *proxy,
+            struct hrpc_messenger *msgr,
+            const struct sockaddr_in *remote,
+            const char *protocol, const char *username)
+{
+    memset(proxy, 0, sizeof(*proxy));
+    proxy->msgr = msgr;
+    proxy->protocol = protocol;
+    proxy->username = username;
+    proxy->call.remote = *remote;
 }
 
 struct hadoop_err *hrpc_proxy_activate(struct hrpc_proxy *proxy)
@@ -191,13 +83,8 @@ void hrpc_proxy_deactivate(struct hrpc_p
 
 void *hrpc_proxy_alloc_userdata(struct hrpc_proxy *proxy, size_t size)
 {
-    if (size > proxy->userdata_len) {
-        uint8_t *new_userdata = realloc(proxy->userdata, size);
-        if (!new_userdata) {
-            return NULL;
-        }
-        proxy->userdata = new_userdata;
-        proxy->userdata_len = size;
+    if (size > RPC_PROXY_USERDATA_MAX) {
+        return NULL;
     }
     return proxy->userdata;
 }
@@ -244,15 +131,11 @@ void hrpc_proxy_start(struct hrpc_proxy 
 
     call->cb = cb;
     call->cb_data = cb_data;
-    call->protocol = strdup(proxy->protocol);
-    if (!call->protocol) {
-        hrpc_call_deliver_err(call, hadoop_lerr_alloc(ENOMEM,
-                "hrpc_proxy_start_internal: out of memory"));
-        return;
-    }
+    call->protocol = proxy->protocol;
+    call->username = proxy->username;
 
     req_header.methodname = (char*)method;
-    req_header.declaringclassprotocolname = proxy->protocol;
+    req_header.declaringclassprotocolname = (char*)proxy->protocol;
     req_header.clientprotocolversion = 1;
     req_header_len = request_header_proto__get_packed_size(&req_header);
     buf_len = varint32_size(req_header_len);
@@ -262,7 +145,7 @@ void hrpc_proxy_start(struct hrpc_proxy 
     if (buf_len >= MAX_SEND_LEN) {
         hrpc_call_deliver_err(call,
             hadoop_lerr_alloc(EINVAL, "hrpc_proxy_setup_header: the "
-                "request length is too long at %"PRId64 " bytes.  The "
+                "request length is too long at %" PRId64 " bytes.  The "
                 "maximum we will send is %d bytes.", buf_len, MAX_SEND_LEN));
         return;
     }
@@ -270,7 +153,7 @@ void hrpc_proxy_start(struct hrpc_proxy 
     if (!buf) {
         hrpc_call_deliver_err(call, 
             hadoop_lerr_alloc(ENOMEM, "hrpc_proxy_setup_header: "
-                "failed to allocate a buffer of length %"PRId64" bytes.",
+                "failed to allocate a buffer of length %" PRId64 " bytes.",
                 buf_len));
         return;
     }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h 
(original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h Thu May 
15 20:08:36 2014
@@ -19,13 +19,13 @@
 #ifndef HADOOP_CORE_RPC_PROXY_H
 #define HADOOP_CORE_RPC_PROXY_H
 
+#include "rpc/call.h"
+
 #include <stdint.h> /* for uint8_t */
 #include <uv.h> /* for uv_buf_t */
 
 struct hadoop_err;
 struct hrpc_messenger;
-struct hrpc_proxy;
-struct hrpc_proxy_builder;
 
 struct hrpc_response {
     uint8_t *pb_base;
@@ -39,65 +39,58 @@ struct hrpc_sync_ctx {
     struct hrpc_response resp;
 };
 
-typedef void (*hrpc_raw_cb_t)(struct hrpc_response *,
-    struct hadoop_err *, void *);
-
 typedef size_t (*hrpc_pack_cb_t)(const void *, uint8_t *);
 
-/**
- * Allocate a Hadoop proxy builder.
- *
- * @param msgr      The Hadoop messenger that this proxy will be associated
- *                    with.
- * @return          A Hadoop proxy builder, or NULL on OOM.
- */
-struct hrpc_proxy_builder *hrpc_proxy_builder_alloc(
-                struct hrpc_messenger *msgr);
-
-/**
- * Free a Hadoop proxy builder.
- *
- * @param bld       The Hadoop proxy builder to free.
- */
-void hrpc_proxy_builder_free(struct hrpc_proxy_builder *bld);
-
-/**
- * Set the protocol used by a proxy.
- *
- * @param bld       The Hadoop proxy builder.
- * @param proto     The protocol string to use.  Will be deep-copied.
- */
-void hrpc_proxy_builder_set_protocol(struct hrpc_proxy_builder *bld,
-                                     const char *proto);
-
-/**
- * Set the remote that the proxy should connect to.
- *
- * @param bld       The Hadoop proxy builder.
- * @param remote    The remote.  Will be deep-copied.
- */
-void hrpc_proxy_builder_set_remote(struct hrpc_proxy_builder *bld,
-                                   const struct sockaddr_in *remote);
+#define RPC_PROXY_USERDATA_MAX 64
 
-/**
- * Create a Hadoop proxy
- *
- * @param bld       The Hadoop proxy builder to use.
- *                      The builder will be freed, even on failure.
- * @param out       (out param) On success, the Hadoop proxy.
- *
- * @return          On success, NULL.  On error, the error message.
- */
-struct hadoop_err *hrpc_proxy_create(struct hrpc_proxy_builder *bld,
-                            struct hrpc_proxy **out);
+struct hrpc_proxy {
+    /**
+     * The messenger that this proxy is associated with.
+     */
+    struct hrpc_messenger *msgr;
+
+    /**
+     * String describing the protocol this proxy speaks. 
+     */
+    const char *protocol;
+
+    /**
+     * String describing the username this proxy uses. 
+     */
+    const char *username;
+
+    /**
+     * The current call.
+     */
+    struct hrpc_call call;
+
+    /**
+     * A memory area which can be used by the current call.
+     *
+     * This will be null if userdata_len is 0.
+     */
+    uint8_t userdata[RPC_PROXY_USERDATA_MAX];
+};
 
 /**
- * Free a Hadoop proxy.
+ * Initialize a Hadoop proxy.
  *
- * @param proxy     The Hadoop proxy to free.  You must not attempt to free a
- *                      proxy with a call in progress.
- */
-void hrpc_proxy_free(struct hrpc_proxy *proxy);
+ * @param proxy     The Hadoop proxy to initialize.
+ * @param msgr      The messenger to associate the proxy with.
+ *                      This messenger must not be de-allocated while the proxy
+ *                      still exists.
+ * @param protocol  The protocol to use for this proxy.
+ *                      This string must remain valid for the lifetime of the
+ *                      proxy.
+ * @param remote    The remote to contact.  Will be copied.
+ * @param username  The username to use.
+ *                      This string must remain valid for the lifetime of the
+ *                      proxy.
+ */
+void hrpc_proxy_init(struct hrpc_proxy *proxy,
+            struct hrpc_messenger *msgr,
+            const struct sockaddr_in *remote,
+            const char *protocol, const char *username);
 
 /**
  * Mark the proxy as active.
@@ -160,6 +153,9 @@ void hrpc_proxy_sync_cb(struct hrpc_resp
  *
  * This method will return after queuing up the RPC to be sent.
  *
+ * Note: after the proxy has been started, you may __not__ de-allocate the
+ * proxy until the callback has happened.
+ *
  * @param proxy                 The Hadoop proxy to use.  A single proxy can
  *                                  only make one call at once.
  * @param method                The method we're calling.

Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c 
(original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c Thu 
May 15 20:08:36 2014
@@ -67,11 +67,11 @@ static struct hrpc_conn *reuse_idle_conn
 
     memset(&exemplar, 0, sizeof(exemplar));
     exemplar.remote = *remote;
-    exemplar.protocol = call->protocol;
+    exemplar.protocol = (char*)call->protocol;
     conn = RB_NFIND(hrpc_conns, &reactor->conns, &exemplar);
     if (!conn)
         return NULL;
-    if (hrpc_conn_usable(conn, remote, call->protocol)) {
+    if (hrpc_conn_usable(conn, remote, call->protocol, call->username)) {
         if (conn->writer.state == HRPC_CONN_WRITE_IDLE) {
             RB_REMOVE(hrpc_conns, &reactor->conns, conn);
             return conn;


Reply via email to