Author: cmccabe Date: Thu May 15 20:08:36 2014 New Revision: 1595028 URL: http://svn.apache.org/r1595028 Log: HADOOP-10564. Add username to native RPCv9 client (cmccabe)
Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt?rev=1595028&r1=1595027&r2=1595028&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt Thu May 15 20:08:36 2014 @@ -55,6 +55,7 @@ set(COMMON_SRCS common/net.c common/string.c common/test.c + common/user.c ) set(COMMON_DEPS pthread Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c?rev=1595028&view=auto ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c (added) +++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c Thu May 15 20:08:36 2014 @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "common/user.h" + +#include <errno.h> +#include <pwd.h> +#include <stdarg.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <unistd.h> + +#define GETPWUID_R_BUFLEN_MIN 4096 +#define GETPWUID_R_BUFLEN_MAX (8 * 1024 * 1024) + +/** + * Get the user name associated with a given numeric user ID. + * + * The POSIX APIs to do this require a little more code than you might expect. + * We have to supply a buffer to the getpwuid_r call. But how big should it + * be? We can call sysconf to get a "suggested size," but this may or may not + * be big enough for our particular user. Also, on some platforms, sysconf + * returns -1 for this. So we have a loop which keeps doubling the size of the + * buffer. As a sanity check against buggy libraries, we give up when we reach + * some ridiculous buffer size. + * + * @param uid UID to look up + * @param out (out param) on success, the user name. + * + * @return 0 on success; error code on failure. + */ +static int uid_to_string(uid_t uid, char **out) +{ + int ret; + struct passwd pwd, *result; + char *buf = NULL, *nbuf; + size_t buflen, nbuflen; + + buflen = sysconf(_SC_GETPW_R_SIZE_MAX); + if (buflen < GETPWUID_R_BUFLEN_MIN) { + buflen = GETPWUID_R_BUFLEN_MIN; + } + while (1) { + nbuf = realloc(buf, buflen); + if (!nbuf) { + ret = ENOMEM; + goto done; + } + buf = nbuf; + ret = getpwuid_r(uid, &pwd, buf, buflen, &result); + if (ret == 0) + break; + if (ret != ERANGE) { + fprintf(stderr, "geteuid_string: getpwuid_r(%lld) failed " + "with error %d\n", (long long)uid, ret); + goto done; + } + nbuflen = buflen *2; + if (nbuflen > GETPWUID_R_BUFLEN_MAX) { + nbuflen = GETPWUID_R_BUFLEN_MAX; + } + if (buflen == nbuflen) { + fprintf(stderr, "geteuid_string: getpwuid_r(%lld) still gets " + "ERANGE with buflen %zd\n", (long long)uid, buflen); + ret = ERANGE; + goto done; + } + buflen = nbuflen; + } + if (!result) { + ret = ENOENT; + fprintf(stderr, "geteuid_string: getpwuid_r(%lld): no name " + "found for current effective user id.\n", (long long)uid); + goto done; + } + *out = strdup(result->pw_name); + if (!*out) { + ret = ENOMEM; + goto done; + } + +done: + free(buf); + return ret; +} + +int geteuid_string(char **out) +{ + return uid_to_string(geteuid(), out); +} + +// vim: ts=4:sw=4:tw=79:et Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h?rev=1595028&view=auto ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h (added) +++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h Thu May 15 20:08:36 2014 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef HADOOP_CORE_COMMON_USER_H +#define HADOOP_CORE_COMMON_USER_H + +/** + * Get the current effective user name. + * + * @param out (out param) The name of the current user. + * A malloc'ed string which must be freed. + * + * @return 0 on success; error code otherwise. + */ +int geteuid_string(char **out); + +#endif + +// vim: ts=4:sw=4:tw=79:et Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c?rev=1595028&r1=1595027&r2=1595028&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c Thu May 15 20:08:36 2014 @@ -18,6 +18,7 @@ #include "common/hadoop_err.h" #include "common/test.h" +#include "common/user.h" #include "protobuf/ClientNamenodeProtocol.call.h" #include "rpc/messenger.h" #include "rpc/proxy.h" @@ -31,12 +32,14 @@ struct options { struct sockaddr_in remote; + char *username; }; static void options_from_env(struct options *opts) { const char *ip_str; const char *port_str; + const char *username; int res, port; ip_str = getenv("HDFS_IP"); @@ -58,6 +61,19 @@ static void options_from_env(struct opti ip_str, port, uv_strerror(res)); exit(EXIT_FAILURE); } + username = getenv("HDFS_USERNAME"); + if (username) { + opts->username = strdup(username); + if (!opts->username) + abort(); + fprintf(stderr, "using HDFS username %s\n", username); + } else { + res = geteuid_string(&opts->username); + if (res) { + fprintf(stderr, "geteuid_string failed with error %d\n", res); + abort(); + } + } } void set_replication_cb(SetReplicationResponseProto *resp, @@ -87,9 +103,8 @@ void set_replication_cb(SetReplicationRe int main(void) { struct hrpc_messenger_builder *msgr_bld; - struct hrpc_proxy_builder *proxy_bld; - struct hrpc_proxy *proxy; struct hrpc_messenger *msgr; + struct hrpc_proxy proxy; struct options opts; uv_sem_t sem; @@ -98,26 +113,24 @@ int main(void) msgr_bld = hrpc_messenger_builder_alloc(); EXPECT_NONNULL(msgr_bld); EXPECT_NO_HADOOP_ERR(hrpc_messenger_create(msgr_bld, &msgr)); - proxy_bld = hrpc_proxy_builder_alloc(msgr); - EXPECT_NONNULL(proxy_bld); - hrpc_proxy_builder_set_remote(proxy_bld, &opts.remote); - hrpc_proxy_builder_set_protocol(proxy_bld, - "org.apache.hadoop.hdfs.protocol.ClientProtocol"); - EXPECT_NO_HADOOP_ERR(hrpc_proxy_create(proxy_bld, &proxy)); + + hrpc_proxy_init(&proxy, msgr, &opts.remote, + "org.apache.hadoop.hdfs.protocol.ClientProtocol", + opts.username); EXPECT_INT_ZERO(uv_sem_init(&sem, 0)); { SetReplicationRequestProto req = SET_REPLICATION_REQUEST_PROTO__INIT; req.src = "/foo2"; req.replication = 2; - cnn_async_set_replication(proxy, &req, set_replication_cb, &sem); + cnn_async_set_replication(&proxy, &req, set_replication_cb, &sem); } uv_sem_wait(&sem); - hrpc_proxy_free(proxy); hrpc_messenger_shutdown(msgr); hrpc_messenger_free(msgr); uv_sem_destroy(&sem); + free(opts.username); return EXIT_SUCCESS; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h?rev=1595028&r1=1595027&r2=1595028&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h Thu May 15 20:08:36 2014 @@ -20,12 +20,14 @@ #define HADOOP_CORE_RPC_CALL_H #include "common/queue.h" -#include "rpc/proxy.h" // for hrpc_raw_cb_t #include <netinet/in.h> // for struct sockaddr_in #include <stdint.h> // for int32_t #include <uv.h> // for uv_buf_t +struct hadoop_err; +struct hrpc_response; + /** * The Hadoop call. * @@ -33,7 +35,8 @@ * include. */ -struct hrpc_call; +typedef void (*hrpc_raw_cb_t)(struct hrpc_response *, + struct hadoop_err *, void *); /** * A Hadoop RPC call. @@ -61,14 +64,19 @@ struct hrpc_call { void *cb_data; /** - * Malloc'ed payload to send. Malloced. + * Malloc'ed payload to send. */ uv_buf_t payload; /** - * The Java protocol we're using. Malloced. + * String describing the protocol this call is using. + */ + const char *protocol; + + /** + * String describing the username this call is using. */ - char *protocol; + const char *username; /** * Nonzero if the call is currently active. Must be set using atomic Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c?rev=1595028&r1=1595027&r2=1595028&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c Thu May 15 20:08:36 2014 @@ -24,10 +24,12 @@ #include "rpc/call.h" #include "rpc/conn.h" #include "rpc/messenger.h" +#include "rpc/proxy.h" #include "rpc/reactor.h" #include "rpc/varint.h" #include <errno.h> +#include <inttypes.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -58,6 +60,8 @@ static const uint8_t FRAME[] = { #define FRAME_LEN sizeof(FRAME) +#define CONNECTION_CONTEXT_CALL_ID (-3) + static void conn_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t* buf); static void hrpc_conn_read_alloc(uv_handle_t *handle, @@ -98,6 +102,7 @@ static struct hadoop_err *conn_setup_ipc struct hrpc_conn_writer *writer = &conn->writer; IpcConnectionContextProto ipc_ctx = IPC_CONNECTION_CONTEXT_PROTO__INIT; RpcRequestHeaderProto rpc_req_header = RPC_REQUEST_HEADER_PROTO__INIT; + UserInformationProto userinfo = USER_INFORMATION_PROTO__INIT; int32_t cset_len, buf_len; int32_t ipc_ctx_len, rpc_req_header_len, off = 0; uint8_t *buf; @@ -107,14 +112,15 @@ static struct hadoop_err *conn_setup_ipc rpc_req_header.rpckind = RPC_PROTOCOL_BUFFER; rpc_req_header.has_rpcop = 1; rpc_req_header.rpcop = RPC_FINAL_PACKET; - rpc_req_header.callid = -3; // ??? + rpc_req_header.callid = CONNECTION_CONTEXT_CALL_ID; rpc_req_header.clientid.data = conn->client_id.buf; rpc_req_header.clientid.len = HRPC_CLIENT_ID_LEN; rpc_req_header.has_retrycount = 0; rpc_req_header_len = rpc_request_header_proto__get_packed_size(&rpc_req_header); - - ipc_ctx.userinfo = NULL; + userinfo.effectiveuser = conn->username; + userinfo.realuser = NULL; + ipc_ctx.userinfo = &userinfo; ipc_ctx.protocol = conn->protocol; ipc_ctx_len = ipc_connection_context_proto__get_packed_size(&ipc_ctx); @@ -156,7 +162,7 @@ static struct hadoop_err *hrpc_conn_setu rpc_req_header.rpckind = RPC_PROTOCOL_BUFFER; rpc_req_header.has_rpcop = 1; rpc_req_header.rpcop = RPC_FINAL_PACKET; - rpc_req_header.callid = 0; // ??? + rpc_req_header.callid = conn->call_id; rpc_req_header.clientid.data = conn->client_id.buf; rpc_req_header.clientid.len = HRPC_CLIENT_ID_LEN; rpc_req_header.has_retrycount = 0; @@ -168,7 +174,7 @@ static struct hadoop_err *hrpc_conn_setu total_len64 += call->payload.len; if (total_len64 > MAX_READER_BODY_LEN) { err = hadoop_lerr_alloc(EINVAL, "hrpc_conn_setup_payload: " - "can't send a payload of length %"PRId64". The maximum " + "can't send a payload of length %" PRId64 ". The maximum " "payload length is %d", total_len64, MAX_READER_BODY_LEN); return err; } @@ -255,6 +261,12 @@ static void conn_start_outbound(struct h struct hrpc_conn_writer *writer = &conn->writer; int res; + // Get next call ID to use. + if (conn->call_id >= 0x7fffffff) { + conn->call_id = 1; + } else { + conn->call_id++; + } writer->state = HRPC_CONN_WRITE_IPC_HEADER; err = conn_setup_ipc_header(conn); if (err) { @@ -317,7 +329,8 @@ struct hadoop_err *hrpc_conn_create_outb conn->call = call; conn->remote = call->remote; conn->protocol = strdup(call->protocol); - if (!conn->protocol) { + conn->username = strdup(call->username); + if ((!conn->protocol) || (!conn->username)) { err = hadoop_lerr_alloc(ENOMEM, "hrpc_conn_create_outbound: OOM"); goto done; } @@ -344,6 +357,7 @@ done: if (err) { if (conn) { free(conn->protocol); + free(conn->username); if (tcp_init) { uv_close((uv_handle_t*)&conn->stream, NULL); } @@ -357,7 +371,7 @@ done: int hrpc_conn_compare(const struct hrpc_conn *a, const struct hrpc_conn *b) { - int proto_cmp, a_active, b_active; + int proto_cmp, username_cmp, a_active, b_active; // Big-endian versus little-endian doesn't matter here. // We just want a consistent ordering on the same machine. @@ -373,6 +387,10 @@ int hrpc_conn_compare(const struct hrpc_ proto_cmp = strcmp(a->protocol, b->protocol); if (proto_cmp != 0) return proto_cmp; + // Compare username. + username_cmp = strcmp(a->username, b->username); + if (username_cmp != 0) + return username_cmp; // Make the inactive connections sort before the active ones. a_active = !!a->call; b_active = !!b->call; @@ -390,7 +408,8 @@ int hrpc_conn_compare(const struct hrpc_ } int hrpc_conn_usable(const struct hrpc_conn *conn, - const struct sockaddr_in *addr, const char *protocol) + const struct sockaddr_in *addr, + const char *protocol, const char *username) { if (conn->remote.sin_addr.s_addr != addr->sin_addr.s_addr) return 0; @@ -398,6 +417,8 @@ int hrpc_conn_usable(const struct hrpc_c return 0; else if (strcmp(conn->protocol, protocol)) return 0; + else if (strcmp(conn->username, username)) + return 0; return 1; } @@ -420,14 +441,14 @@ static struct hadoop_err *conn_deliver_r // Check if the server sent us a bogus payload_len value. if (payload_len < 0) { return hadoop_lerr_alloc(EIO, "conn_deliver_resp: " - "server's payload_len was %"PRId32", but negative payload " + "server's payload_len was %" PRId32 ", but negative payload " "lengths are not valid.", payload_len); } payload_end = off; payload_end += payload_len; if (payload_end > reader->body_len) { return hadoop_lerr_alloc(EIO, "conn_deliver_resp: " - "server's payload_len was %"PRId64", but there are only %d " + "server's payload_len was %" PRId64 ", but there are only %d " "bytes left in the body buffer.", payload_end, reader->body_len); } // Reset the connection's read state. We'll hold on to the response buffer @@ -461,13 +482,13 @@ static struct hadoop_err *conn_process_r } if (resp_header_len <= 0) { err = hadoop_lerr_alloc(EIO, "conn_process_response: server sent " - "invalid resp_header_len of %"PRId32, resp_header_len); + "invalid resp_header_len of %" PRId32, resp_header_len); goto done; } rem = reader->body_len - off; if (resp_header_len > rem) { err = hadoop_lerr_alloc(EIO, "conn_process_response: server sent " - "resp_header_len of %"PRId32", but there were only %"PRId32 + "resp_header_len of %" PRId32 ", but there were only %" PRId32 " bytes left in the response.", resp_header_len, rem); goto done; } @@ -479,6 +500,14 @@ static struct hadoop_err *conn_process_r goto done; } off += resp_header_len; + if (resp_header->callid != conn->call_id) { + // We currently only send one request at a time. So when we get a + // response, we expect it to be for the request we just sent. + err = hadoop_lerr_alloc(EIO, "conn_process_response: incorrect call " + "id in response. Expected %" PRId32 ", got %" PRId32 ".", + conn->call_id, resp_header->callid); + goto done; + } if (resp_header->status != RPC_STATUS_PROTO__SUCCESS) { // TODO: keep connection open if we got an ERROR rather than a FATAL. err = hadoop_lerr_alloc(EIO, "conn_process_response: error %s: %s", @@ -546,8 +575,8 @@ static void hrpc_conn_read_alloc(uv_hand rem = reader->body_len - reader->off; if (rem <= 0) { conn_log_warn(conn, "hrpc_conn_read_alloc: we're in state " - "HRPC_CONN_READ_BODY with reader->body_len = %"PRId32", but " - "reader->off = %"PRId32"\n", reader->body_len, reader->off); + "HRPC_CONN_READ_BODY with reader->body_len = %" PRId32 ", but " + "reader->off = %" PRId32 "\n", reader->body_len, reader->off); buf->base = NULL; buf->len = 0; return; @@ -589,7 +618,7 @@ static void conn_read_cb(uv_stream_t *st reader->off += nread; if (reader->off < READLEN_BUF_LEN) { conn_log_debug(conn, "conn_read_cb: got partial read of " - "body_len. reader->off = %"PRId32"\n", + "body_len. reader->off = %" PRId32 "\n", reader->off); return; } @@ -597,12 +626,12 @@ static void conn_read_cb(uv_stream_t *st if ((reader->body_len <= 0) || (reader->body_len > MAX_READER_BODY_LEN)) { hrpc_conn_destroy(conn, hadoop_lerr_alloc(EIO, - "conn_read_cb: got an invalid body length of %"PRId32"\n", + "conn_read_cb: got an invalid body length of %" PRId32 "\n", reader->body_len)); return; } conn_log_debug(conn, "conn_read_cb: got body length of " - "%"PRId32". Transitioning to HRPC_CONN_READ_BODY.\n", + "%" PRId32 ". Transitioning to HRPC_CONN_READ_BODY.\n", reader->body_len); reader->off = 0; reader->state = HRPC_CONN_READ_BODY; @@ -610,15 +639,15 @@ static void conn_read_cb(uv_stream_t *st if (!reader->body) { hrpc_conn_destroy(conn, hadoop_lerr_alloc(ENOMEM, "hrpc_conn_read_alloc: failed to allocate " - "%"PRId32" bytes.\n", reader->body_len)); + "%" PRId32 " bytes.\n", reader->body_len)); } break; case HRPC_CONN_READ_BODY: reader->off += nread; if (reader->off < reader->body_len) { conn_log_debug(conn, "conn_read_cb: got partial read of " - "body. reader->off = %"PRId32" out of %"PRId32"\n", - reader->off, reader->body_len); + "body. reader->off = %" PRId32 " out of %" + PRId32 "\n", reader->off, reader->body_len); return; } err = conn_process_response(conn); @@ -663,6 +692,7 @@ void hrpc_conn_destroy(struct hrpc_conn free_write_bufs(conn); conn->writer.state = HRPC_CONN_WRITE_CLOSED; free(conn->protocol); + free(conn->username); uv_close((uv_handle_t*)&conn->stream, conn_free); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h?rev=1595028&r1=1595027&r2=1595028&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h Thu May 15 20:08:36 2014 @@ -134,6 +134,11 @@ struct hrpc_conn { char *protocol; /** + * The Hadoop username we're connected as. Malloc'ed. + */ + char *username; + + /** * The client ID we used when establishing the connection. */ struct hrpc_client_id client_id; @@ -143,6 +148,11 @@ struct hrpc_conn { */ uv_connect_t conn_req; + /** + * The current call ID. + */ + uint32_t call_id; + struct hrpc_conn_writer writer; struct hrpc_conn_reader reader; }; @@ -198,11 +208,13 @@ int hrpc_conn_compare(const struct hrpc_ * @param conn The connection. * @param addr The address. * @param protocol The protocol. + * @param username The username. * * @return 1 if the connection is usable; 0 if not. */ int hrpc_conn_usable(const struct hrpc_conn *conn, - const struct sockaddr_in *addr, const char *protocol); + const struct sockaddr_in *addr, + const char *protocol, const char *username); /** * Destroy a connection. Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c?rev=1595028&r1=1595027&r2=1595028&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c Thu May 15 20:08:36 2014 @@ -24,6 +24,7 @@ #include "rpc/varint.h" #include <errno.h> +#include <inttypes.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> @@ -43,131 +44,22 @@ */ #define MAX_SEND_LEN (63 * 1024 * 1024) -struct hrpc_proxy { - /** - * The messenger that this proxy is associated with. - */ - struct hrpc_messenger *msgr; - - /** - * Dynamically allocated string describing the protocol this proxy speaks. - */ - char *protocol; - - /** - * The current call. - */ - struct hrpc_call call; - - /** - * A memory area which can be used by the current call. - * - * This will be null if userdata_len is 0. - */ - uint8_t *userdata; - - /** - * Length of userdata. - */ - size_t userdata_len; -}; - struct hrpc_proxy_builder { struct hrpc_proxy *proxy; }; static const char OOM_ERROR[] = "OOM"; -struct hrpc_proxy_builder *hrpc_proxy_builder_alloc( - struct hrpc_messenger *msgr) -{ - struct hrpc_proxy_builder *bld; - - bld = calloc(1, sizeof(struct hrpc_proxy_builder)); - if (!bld) - return NULL; - bld->proxy = calloc(1, sizeof(struct hrpc_proxy)); - if (!bld->proxy) { - free(bld); - return NULL; - } - bld->proxy->msgr = msgr; - bld->proxy->call.remote.sin_addr.s_addr = INADDR_ANY; - - return bld; -} - -void hrpc_proxy_builder_free(struct hrpc_proxy_builder *bld) -{ - if (!bld) - return; - free(bld->proxy); - free(bld); -} - -void hrpc_proxy_builder_set_protocol(struct hrpc_proxy_builder *bld, - const char *protocol) -{ - struct hrpc_proxy *proxy = bld->proxy; - - if (proxy->protocol) { - if (proxy->protocol != OOM_ERROR) { - free(proxy->protocol); - } - proxy->protocol = NULL; - } - proxy->protocol = strdup(protocol); - if (!proxy->protocol) { - proxy->protocol = (char*)OOM_ERROR; - } -} - -void hrpc_proxy_builder_set_remote(struct hrpc_proxy_builder *bld, - const struct sockaddr_in *remote) -{ - bld->proxy->call.remote = *remote; -} - -struct hadoop_err *hrpc_proxy_create(struct hrpc_proxy_builder *bld, - struct hrpc_proxy **out) -{ - struct hrpc_proxy *proxy; - - proxy = bld->proxy; - free(bld); - //fprintf(stderr, "proxy = %p, proxy->protocol = %s, proxy->call.cb = %p\n", proxy, proxy->protocol, proxy->call.cb); - if (proxy->call.remote.sin_addr.s_addr == INADDR_ANY) { - hrpc_proxy_free(proxy); - return hadoop_lerr_alloc(EINVAL, "hrpc_proxy_create: you must specify " - "a remote."); - } - if (!proxy->protocol) { - hrpc_proxy_free(proxy); - return hadoop_lerr_alloc(EINVAL, "hrpc_proxy_create: can't create " - "a proxy without a protocol argument."); - } else if (proxy->protocol == OOM_ERROR) { - // There was an OOM error during hrpc_proxy_builder_set_protocol. - hrpc_proxy_free(proxy); - return hadoop_lerr_alloc(ENOMEM, "hrpc_proxy_create: OOM error."); - } - *out = proxy; - return NULL; -} - -void hrpc_proxy_free(struct hrpc_proxy *proxy) -{ - if (!proxy) - return; - if (hrpc_call_is_active(&proxy->call)) { - proxy_log_warn(proxy, "%s", "hrpc_proxy_free: attempt to free a proxy " - "which is currently active!\n"); - return; - } - if (proxy->protocol != OOM_ERROR) { - free(proxy->protocol); - } - free(proxy->userdata); - free(proxy); +void hrpc_proxy_init(struct hrpc_proxy *proxy, + struct hrpc_messenger *msgr, + const struct sockaddr_in *remote, + const char *protocol, const char *username) +{ + memset(proxy, 0, sizeof(*proxy)); + proxy->msgr = msgr; + proxy->protocol = protocol; + proxy->username = username; + proxy->call.remote = *remote; } struct hadoop_err *hrpc_proxy_activate(struct hrpc_proxy *proxy) @@ -191,13 +83,8 @@ void hrpc_proxy_deactivate(struct hrpc_p void *hrpc_proxy_alloc_userdata(struct hrpc_proxy *proxy, size_t size) { - if (size > proxy->userdata_len) { - uint8_t *new_userdata = realloc(proxy->userdata, size); - if (!new_userdata) { - return NULL; - } - proxy->userdata = new_userdata; - proxy->userdata_len = size; + if (size > RPC_PROXY_USERDATA_MAX) { + return NULL; } return proxy->userdata; } @@ -244,15 +131,11 @@ void hrpc_proxy_start(struct hrpc_proxy call->cb = cb; call->cb_data = cb_data; - call->protocol = strdup(proxy->protocol); - if (!call->protocol) { - hrpc_call_deliver_err(call, hadoop_lerr_alloc(ENOMEM, - "hrpc_proxy_start_internal: out of memory")); - return; - } + call->protocol = proxy->protocol; + call->username = proxy->username; req_header.methodname = (char*)method; - req_header.declaringclassprotocolname = proxy->protocol; + req_header.declaringclassprotocolname = (char*)proxy->protocol; req_header.clientprotocolversion = 1; req_header_len = request_header_proto__get_packed_size(&req_header); buf_len = varint32_size(req_header_len); @@ -262,7 +145,7 @@ void hrpc_proxy_start(struct hrpc_proxy if (buf_len >= MAX_SEND_LEN) { hrpc_call_deliver_err(call, hadoop_lerr_alloc(EINVAL, "hrpc_proxy_setup_header: the " - "request length is too long at %"PRId64 " bytes. The " + "request length is too long at %" PRId64 " bytes. The " "maximum we will send is %d bytes.", buf_len, MAX_SEND_LEN)); return; } @@ -270,7 +153,7 @@ void hrpc_proxy_start(struct hrpc_proxy if (!buf) { hrpc_call_deliver_err(call, hadoop_lerr_alloc(ENOMEM, "hrpc_proxy_setup_header: " - "failed to allocate a buffer of length %"PRId64" bytes.", + "failed to allocate a buffer of length %" PRId64 " bytes.", buf_len)); return; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h?rev=1595028&r1=1595027&r2=1595028&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h Thu May 15 20:08:36 2014 @@ -19,13 +19,13 @@ #ifndef HADOOP_CORE_RPC_PROXY_H #define HADOOP_CORE_RPC_PROXY_H +#include "rpc/call.h" + #include <stdint.h> /* for uint8_t */ #include <uv.h> /* for uv_buf_t */ struct hadoop_err; struct hrpc_messenger; -struct hrpc_proxy; -struct hrpc_proxy_builder; struct hrpc_response { uint8_t *pb_base; @@ -39,65 +39,58 @@ struct hrpc_sync_ctx { struct hrpc_response resp; }; -typedef void (*hrpc_raw_cb_t)(struct hrpc_response *, - struct hadoop_err *, void *); - typedef size_t (*hrpc_pack_cb_t)(const void *, uint8_t *); -/** - * Allocate a Hadoop proxy builder. - * - * @param msgr The Hadoop messenger that this proxy will be associated - * with. - * @return A Hadoop proxy builder, or NULL on OOM. - */ -struct hrpc_proxy_builder *hrpc_proxy_builder_alloc( - struct hrpc_messenger *msgr); - -/** - * Free a Hadoop proxy builder. - * - * @param bld The Hadoop proxy builder to free. - */ -void hrpc_proxy_builder_free(struct hrpc_proxy_builder *bld); - -/** - * Set the protocol used by a proxy. - * - * @param bld The Hadoop proxy builder. - * @param proto The protocol string to use. Will be deep-copied. - */ -void hrpc_proxy_builder_set_protocol(struct hrpc_proxy_builder *bld, - const char *proto); - -/** - * Set the remote that the proxy should connect to. - * - * @param bld The Hadoop proxy builder. - * @param remote The remote. Will be deep-copied. - */ -void hrpc_proxy_builder_set_remote(struct hrpc_proxy_builder *bld, - const struct sockaddr_in *remote); +#define RPC_PROXY_USERDATA_MAX 64 -/** - * Create a Hadoop proxy - * - * @param bld The Hadoop proxy builder to use. - * The builder will be freed, even on failure. - * @param out (out param) On success, the Hadoop proxy. - * - * @return On success, NULL. On error, the error message. - */ -struct hadoop_err *hrpc_proxy_create(struct hrpc_proxy_builder *bld, - struct hrpc_proxy **out); +struct hrpc_proxy { + /** + * The messenger that this proxy is associated with. + */ + struct hrpc_messenger *msgr; + + /** + * String describing the protocol this proxy speaks. + */ + const char *protocol; + + /** + * String describing the username this proxy uses. + */ + const char *username; + + /** + * The current call. + */ + struct hrpc_call call; + + /** + * A memory area which can be used by the current call. + * + * This will be null if userdata_len is 0. + */ + uint8_t userdata[RPC_PROXY_USERDATA_MAX]; +}; /** - * Free a Hadoop proxy. + * Initialize a Hadoop proxy. * - * @param proxy The Hadoop proxy to free. You must not attempt to free a - * proxy with a call in progress. - */ -void hrpc_proxy_free(struct hrpc_proxy *proxy); + * @param proxy The Hadoop proxy to initialize. + * @param msgr The messenger to associate the proxy with. + * This messenger must not be de-allocated while the proxy + * still exists. + * @param protocol The protocol to use for this proxy. + * This string must remain valid for the lifetime of the + * proxy. + * @param remote The remote to contact. Will be copied. + * @param username The username to use. + * This string must remain valid for the lifetime of the + * proxy. + */ +void hrpc_proxy_init(struct hrpc_proxy *proxy, + struct hrpc_messenger *msgr, + const struct sockaddr_in *remote, + const char *protocol, const char *username); /** * Mark the proxy as active. @@ -160,6 +153,9 @@ void hrpc_proxy_sync_cb(struct hrpc_resp * * This method will return after queuing up the RPC to be sent. * + * Note: after the proxy has been started, you may __not__ de-allocate the + * proxy until the callback has happened. + * * @param proxy The Hadoop proxy to use. A single proxy can * only make one call at once. * @param method The method we're calling. Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c?rev=1595028&r1=1595027&r2=1595028&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c Thu May 15 20:08:36 2014 @@ -67,11 +67,11 @@ static struct hrpc_conn *reuse_idle_conn memset(&exemplar, 0, sizeof(exemplar)); exemplar.remote = *remote; - exemplar.protocol = call->protocol; + exemplar.protocol = (char*)call->protocol; conn = RB_NFIND(hrpc_conns, &reactor->conns, &exemplar); if (!conn) return NULL; - if (hrpc_conn_usable(conn, remote, call->protocol)) { + if (hrpc_conn_usable(conn, remote, call->protocol, call->username)) { if (conn->writer.state == HRPC_CONN_WRITE_IDLE) { RB_REMOVE(hrpc_conns, &reactor->conns, conn); return conn;