Author: cmccabe
Date: Thu May 15 20:08:36 2014
New Revision: 1595028
URL: http://svn.apache.org/r1595028
Log:
HADOOP-10564. Add username to native RPCv9 client (cmccabe)
Added:
hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c
hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h
Modified:
hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt
hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c
hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h
hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c
hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h
hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c
hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h
hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c
Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt
(original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/CMakeLists.txt Thu
May 15 20:08:36 2014
@@ -55,6 +55,7 @@ set(COMMON_SRCS
common/net.c
common/string.c
common/test.c
+ common/user.c
)
set(COMMON_DEPS
pthread
Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c?rev=1595028&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.c Thu
May 15 20:08:36 2014
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/user.h"
+
+#include <errno.h>
+#include <pwd.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#define GETPWUID_R_BUFLEN_MIN 4096
+#define GETPWUID_R_BUFLEN_MAX (8 * 1024 * 1024)
+
+/**
+ * Get the user name associated with a given numeric user ID.
+ *
+ * The POSIX APIs to do this require a little more code than you might expect.
+ * We have to supply a buffer to the getpwuid_r call. But how big should it
+ * be? We can call sysconf to get a "suggested size," but this may or may not
+ * be big enough for our particular user. Also, on some platforms, sysconf
+ * returns -1 for this. So we have a loop which keeps doubling the size of the
+ * buffer. As a sanity check against buggy libraries, we give up when we reach
+ * some ridiculous buffer size.
+ *
+ * @param uid UID to look up
+ * @param out (out param) on success, the user name.
+ *
+ * @return 0 on success; error code on failure.
+ */
+static int uid_to_string(uid_t uid, char **out)
+{
+ int ret;
+ struct passwd pwd, *result;
+ char *buf = NULL, *nbuf;
+ size_t buflen, nbuflen;
+
+ buflen = sysconf(_SC_GETPW_R_SIZE_MAX);
+ if (buflen < GETPWUID_R_BUFLEN_MIN) {
+ buflen = GETPWUID_R_BUFLEN_MIN;
+ }
+ while (1) {
+ nbuf = realloc(buf, buflen);
+ if (!nbuf) {
+ ret = ENOMEM;
+ goto done;
+ }
+ buf = nbuf;
+ ret = getpwuid_r(uid, &pwd, buf, buflen, &result);
+ if (ret == 0)
+ break;
+ if (ret != ERANGE) {
+ fprintf(stderr, "geteuid_string: getpwuid_r(%lld) failed "
+ "with error %d\n", (long long)uid, ret);
+ goto done;
+ }
+ nbuflen = buflen *2;
+ if (nbuflen > GETPWUID_R_BUFLEN_MAX) {
+ nbuflen = GETPWUID_R_BUFLEN_MAX;
+ }
+ if (buflen == nbuflen) {
+ fprintf(stderr, "geteuid_string: getpwuid_r(%lld) still gets "
+ "ERANGE with buflen %zd\n", (long long)uid, buflen);
+ ret = ERANGE;
+ goto done;
+ }
+ buflen = nbuflen;
+ }
+ if (!result) {
+ ret = ENOENT;
+ fprintf(stderr, "geteuid_string: getpwuid_r(%lld): no name "
+ "found for current effective user id.\n", (long long)uid);
+ goto done;
+ }
+ *out = strdup(result->pw_name);
+ if (!*out) {
+ ret = ENOMEM;
+ goto done;
+ }
+
+done:
+ free(buf);
+ return ret;
+}
+
+int geteuid_string(char **out)
+{
+ return uid_to_string(geteuid(), out);
+}
+
+// vim: ts=4:sw=4:tw=79:et
Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h?rev=1595028&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/common/user.h Thu
May 15 20:08:36 2014
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HADOOP_CORE_COMMON_USER_H
+#define HADOOP_CORE_COMMON_USER_H
+
+/**
+ * Get the current effective user name.
+ *
+ * @param out (out param) The name of the current user.
+ * A malloc'ed string which must be freed.
+ *
+ * @return 0 on success; error code otherwise.
+ */
+int geteuid_string(char **out);
+
+#endif
+
+// vim: ts=4:sw=4:tw=79:et
Modified:
hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
---
hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c
(original)
+++
hadoop/common/branches/HADOOP-10388/hadoop-native-core/hdfs/namenode-rpc-unit.c
Thu May 15 20:08:36 2014
@@ -18,6 +18,7 @@
#include "common/hadoop_err.h"
#include "common/test.h"
+#include "common/user.h"
#include "protobuf/ClientNamenodeProtocol.call.h"
#include "rpc/messenger.h"
#include "rpc/proxy.h"
@@ -31,12 +32,14 @@
struct options {
struct sockaddr_in remote;
+ char *username;
};
static void options_from_env(struct options *opts)
{
const char *ip_str;
const char *port_str;
+ const char *username;
int res, port;
ip_str = getenv("HDFS_IP");
@@ -58,6 +61,19 @@ static void options_from_env(struct opti
ip_str, port, uv_strerror(res));
exit(EXIT_FAILURE);
}
+ username = getenv("HDFS_USERNAME");
+ if (username) {
+ opts->username = strdup(username);
+ if (!opts->username)
+ abort();
+ fprintf(stderr, "using HDFS username %s\n", username);
+ } else {
+ res = geteuid_string(&opts->username);
+ if (res) {
+ fprintf(stderr, "geteuid_string failed with error %d\n", res);
+ abort();
+ }
+ }
}
void set_replication_cb(SetReplicationResponseProto *resp,
@@ -87,9 +103,8 @@ void set_replication_cb(SetReplicationRe
int main(void)
{
struct hrpc_messenger_builder *msgr_bld;
- struct hrpc_proxy_builder *proxy_bld;
- struct hrpc_proxy *proxy;
struct hrpc_messenger *msgr;
+ struct hrpc_proxy proxy;
struct options opts;
uv_sem_t sem;
@@ -98,26 +113,24 @@ int main(void)
msgr_bld = hrpc_messenger_builder_alloc();
EXPECT_NONNULL(msgr_bld);
EXPECT_NO_HADOOP_ERR(hrpc_messenger_create(msgr_bld, &msgr));
- proxy_bld = hrpc_proxy_builder_alloc(msgr);
- EXPECT_NONNULL(proxy_bld);
- hrpc_proxy_builder_set_remote(proxy_bld, &opts.remote);
- hrpc_proxy_builder_set_protocol(proxy_bld,
- "org.apache.hadoop.hdfs.protocol.ClientProtocol");
- EXPECT_NO_HADOOP_ERR(hrpc_proxy_create(proxy_bld, &proxy));
+
+ hrpc_proxy_init(&proxy, msgr, &opts.remote,
+ "org.apache.hadoop.hdfs.protocol.ClientProtocol",
+ opts.username);
EXPECT_INT_ZERO(uv_sem_init(&sem, 0));
{
SetReplicationRequestProto req = SET_REPLICATION_REQUEST_PROTO__INIT;
req.src = "/foo2";
req.replication = 2;
- cnn_async_set_replication(proxy, &req, set_replication_cb, &sem);
+ cnn_async_set_replication(&proxy, &req, set_replication_cb, &sem);
}
uv_sem_wait(&sem);
- hrpc_proxy_free(proxy);
hrpc_messenger_shutdown(msgr);
hrpc_messenger_free(msgr);
uv_sem_destroy(&sem);
+ free(opts.username);
return EXIT_SUCCESS;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/call.h Thu May
15 20:08:36 2014
@@ -20,12 +20,14 @@
#define HADOOP_CORE_RPC_CALL_H
#include "common/queue.h"
-#include "rpc/proxy.h" // for hrpc_raw_cb_t
#include <netinet/in.h> // for struct sockaddr_in
#include <stdint.h> // for int32_t
#include <uv.h> // for uv_buf_t
+struct hadoop_err;
+struct hrpc_response;
+
/**
* The Hadoop call.
*
@@ -33,7 +35,8 @@
* include.
*/
-struct hrpc_call;
+typedef void (*hrpc_raw_cb_t)(struct hrpc_response *,
+ struct hadoop_err *, void *);
/**
* A Hadoop RPC call.
@@ -61,14 +64,19 @@ struct hrpc_call {
void *cb_data;
/**
- * Malloc'ed payload to send. Malloced.
+ * Malloc'ed payload to send.
*/
uv_buf_t payload;
/**
- * The Java protocol we're using. Malloced.
+ * String describing the protocol this call is using.
+ */
+ const char *protocol;
+
+ /**
+ * String describing the username this call is using.
*/
- char *protocol;
+ const char *username;
/**
* Nonzero if the call is currently active. Must be set using atomic
Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.c Thu May
15 20:08:36 2014
@@ -24,10 +24,12 @@
#include "rpc/call.h"
#include "rpc/conn.h"
#include "rpc/messenger.h"
+#include "rpc/proxy.h"
#include "rpc/reactor.h"
#include "rpc/varint.h"
#include <errno.h>
+#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -58,6 +60,8 @@ static const uint8_t FRAME[] = {
#define FRAME_LEN sizeof(FRAME)
+#define CONNECTION_CONTEXT_CALL_ID (-3)
+
static void conn_read_cb(uv_stream_t *stream, ssize_t nread,
const uv_buf_t* buf);
static void hrpc_conn_read_alloc(uv_handle_t *handle,
@@ -98,6 +102,7 @@ static struct hadoop_err *conn_setup_ipc
struct hrpc_conn_writer *writer = &conn->writer;
IpcConnectionContextProto ipc_ctx = IPC_CONNECTION_CONTEXT_PROTO__INIT;
RpcRequestHeaderProto rpc_req_header = RPC_REQUEST_HEADER_PROTO__INIT;
+ UserInformationProto userinfo = USER_INFORMATION_PROTO__INIT;
int32_t cset_len, buf_len;
int32_t ipc_ctx_len, rpc_req_header_len, off = 0;
uint8_t *buf;
@@ -107,14 +112,15 @@ static struct hadoop_err *conn_setup_ipc
rpc_req_header.rpckind = RPC_PROTOCOL_BUFFER;
rpc_req_header.has_rpcop = 1;
rpc_req_header.rpcop = RPC_FINAL_PACKET;
- rpc_req_header.callid = -3; // ???
+ rpc_req_header.callid = CONNECTION_CONTEXT_CALL_ID;
rpc_req_header.clientid.data = conn->client_id.buf;
rpc_req_header.clientid.len = HRPC_CLIENT_ID_LEN;
rpc_req_header.has_retrycount = 0;
rpc_req_header_len =
rpc_request_header_proto__get_packed_size(&rpc_req_header);
-
- ipc_ctx.userinfo = NULL;
+ userinfo.effectiveuser = conn->username;
+ userinfo.realuser = NULL;
+ ipc_ctx.userinfo = &userinfo;
ipc_ctx.protocol = conn->protocol;
ipc_ctx_len = ipc_connection_context_proto__get_packed_size(&ipc_ctx);
@@ -156,7 +162,7 @@ static struct hadoop_err *hrpc_conn_setu
rpc_req_header.rpckind = RPC_PROTOCOL_BUFFER;
rpc_req_header.has_rpcop = 1;
rpc_req_header.rpcop = RPC_FINAL_PACKET;
- rpc_req_header.callid = 0; // ???
+ rpc_req_header.callid = conn->call_id;
rpc_req_header.clientid.data = conn->client_id.buf;
rpc_req_header.clientid.len = HRPC_CLIENT_ID_LEN;
rpc_req_header.has_retrycount = 0;
@@ -168,7 +174,7 @@ static struct hadoop_err *hrpc_conn_setu
total_len64 += call->payload.len;
if (total_len64 > MAX_READER_BODY_LEN) {
err = hadoop_lerr_alloc(EINVAL, "hrpc_conn_setup_payload: "
- "can't send a payload of length %"PRId64". The maximum "
+ "can't send a payload of length %" PRId64 ". The maximum "
"payload length is %d", total_len64, MAX_READER_BODY_LEN);
return err;
}
@@ -255,6 +261,12 @@ static void conn_start_outbound(struct h
struct hrpc_conn_writer *writer = &conn->writer;
int res;
+ // Get next call ID to use.
+ if (conn->call_id >= 0x7fffffff) {
+ conn->call_id = 1;
+ } else {
+ conn->call_id++;
+ }
writer->state = HRPC_CONN_WRITE_IPC_HEADER;
err = conn_setup_ipc_header(conn);
if (err) {
@@ -317,7 +329,8 @@ struct hadoop_err *hrpc_conn_create_outb
conn->call = call;
conn->remote = call->remote;
conn->protocol = strdup(call->protocol);
- if (!conn->protocol) {
+ conn->username = strdup(call->username);
+ if ((!conn->protocol) || (!conn->username)) {
err = hadoop_lerr_alloc(ENOMEM, "hrpc_conn_create_outbound: OOM");
goto done;
}
@@ -344,6 +357,7 @@ done:
if (err) {
if (conn) {
free(conn->protocol);
+ free(conn->username);
if (tcp_init) {
uv_close((uv_handle_t*)&conn->stream, NULL);
}
@@ -357,7 +371,7 @@ done:
int hrpc_conn_compare(const struct hrpc_conn *a, const struct hrpc_conn *b)
{
- int proto_cmp, a_active, b_active;
+ int proto_cmp, username_cmp, a_active, b_active;
// Big-endian versus little-endian doesn't matter here.
// We just want a consistent ordering on the same machine.
@@ -373,6 +387,10 @@ int hrpc_conn_compare(const struct hrpc_
proto_cmp = strcmp(a->protocol, b->protocol);
if (proto_cmp != 0)
return proto_cmp;
+ // Compare username.
+ username_cmp = strcmp(a->username, b->username);
+ if (username_cmp != 0)
+ return username_cmp;
// Make the inactive connections sort before the active ones.
a_active = !!a->call;
b_active = !!b->call;
@@ -390,7 +408,8 @@ int hrpc_conn_compare(const struct hrpc_
}
int hrpc_conn_usable(const struct hrpc_conn *conn,
- const struct sockaddr_in *addr, const char *protocol)
+ const struct sockaddr_in *addr,
+ const char *protocol, const char *username)
{
if (conn->remote.sin_addr.s_addr != addr->sin_addr.s_addr)
return 0;
@@ -398,6 +417,8 @@ int hrpc_conn_usable(const struct hrpc_c
return 0;
else if (strcmp(conn->protocol, protocol))
return 0;
+ else if (strcmp(conn->username, username))
+ return 0;
return 1;
}
@@ -420,14 +441,14 @@ static struct hadoop_err *conn_deliver_r
// Check if the server sent us a bogus payload_len value.
if (payload_len < 0) {
return hadoop_lerr_alloc(EIO, "conn_deliver_resp: "
- "server's payload_len was %"PRId32", but negative payload "
+ "server's payload_len was %" PRId32 ", but negative payload "
"lengths are not valid.", payload_len);
}
payload_end = off;
payload_end += payload_len;
if (payload_end > reader->body_len) {
return hadoop_lerr_alloc(EIO, "conn_deliver_resp: "
- "server's payload_len was %"PRId64", but there are only %d "
+ "server's payload_len was %" PRId64 ", but there are only %d "
"bytes left in the body buffer.", payload_end, reader->body_len);
}
// Reset the connection's read state. We'll hold on to the response buffer
@@ -461,13 +482,13 @@ static struct hadoop_err *conn_process_r
}
if (resp_header_len <= 0) {
err = hadoop_lerr_alloc(EIO, "conn_process_response: server sent "
- "invalid resp_header_len of %"PRId32, resp_header_len);
+ "invalid resp_header_len of %" PRId32, resp_header_len);
goto done;
}
rem = reader->body_len - off;
if (resp_header_len > rem) {
err = hadoop_lerr_alloc(EIO, "conn_process_response: server sent "
- "resp_header_len of %"PRId32", but there were only %"PRId32
+ "resp_header_len of %" PRId32 ", but there were only %" PRId32
" bytes left in the response.", resp_header_len, rem);
goto done;
}
@@ -479,6 +500,14 @@ static struct hadoop_err *conn_process_r
goto done;
}
off += resp_header_len;
+ if (resp_header->callid != conn->call_id) {
+ // We currently only send one request at a time. So when we get a
+ // response, we expect it to be for the request we just sent.
+ err = hadoop_lerr_alloc(EIO, "conn_process_response: incorrect call "
+ "id in response. Expected %" PRId32 ", got %" PRId32 ".",
+ conn->call_id, resp_header->callid);
+ goto done;
+ }
if (resp_header->status != RPC_STATUS_PROTO__SUCCESS) {
// TODO: keep connection open if we got an ERROR rather than a FATAL.
err = hadoop_lerr_alloc(EIO, "conn_process_response: error %s: %s",
@@ -546,8 +575,8 @@ static void hrpc_conn_read_alloc(uv_hand
rem = reader->body_len - reader->off;
if (rem <= 0) {
conn_log_warn(conn, "hrpc_conn_read_alloc: we're in state "
- "HRPC_CONN_READ_BODY with reader->body_len = %"PRId32",
but "
- "reader->off = %"PRId32"\n", reader->body_len,
reader->off);
+ "HRPC_CONN_READ_BODY with reader->body_len = %" PRId32 ",
but "
+ "reader->off = %" PRId32 "\n", reader->body_len,
reader->off);
buf->base = NULL;
buf->len = 0;
return;
@@ -589,7 +618,7 @@ static void conn_read_cb(uv_stream_t *st
reader->off += nread;
if (reader->off < READLEN_BUF_LEN) {
conn_log_debug(conn, "conn_read_cb: got partial read of "
- "body_len. reader->off = %"PRId32"\n",
+ "body_len. reader->off = %" PRId32 "\n",
reader->off);
return;
}
@@ -597,12 +626,12 @@ static void conn_read_cb(uv_stream_t *st
if ((reader->body_len <= 0) ||
(reader->body_len > MAX_READER_BODY_LEN)) {
hrpc_conn_destroy(conn, hadoop_lerr_alloc(EIO,
- "conn_read_cb: got an invalid body length of %"PRId32"\n",
+ "conn_read_cb: got an invalid body length of %" PRId32 "\n",
reader->body_len));
return;
}
conn_log_debug(conn, "conn_read_cb: got body length of "
- "%"PRId32". Transitioning to HRPC_CONN_READ_BODY.\n",
+ "%" PRId32 ". Transitioning to HRPC_CONN_READ_BODY.\n",
reader->body_len);
reader->off = 0;
reader->state = HRPC_CONN_READ_BODY;
@@ -610,15 +639,15 @@ static void conn_read_cb(uv_stream_t *st
if (!reader->body) {
hrpc_conn_destroy(conn, hadoop_lerr_alloc(ENOMEM,
"hrpc_conn_read_alloc: failed to allocate "
- "%"PRId32" bytes.\n", reader->body_len));
+ "%" PRId32 " bytes.\n", reader->body_len));
}
break;
case HRPC_CONN_READ_BODY:
reader->off += nread;
if (reader->off < reader->body_len) {
conn_log_debug(conn, "conn_read_cb: got partial read of "
- "body. reader->off = %"PRId32" out of %"PRId32"\n",
- reader->off, reader->body_len);
+ "body. reader->off = %" PRId32 " out of %"
+ PRId32 "\n", reader->off, reader->body_len);
return;
}
err = conn_process_response(conn);
@@ -663,6 +692,7 @@ void hrpc_conn_destroy(struct hrpc_conn
free_write_bufs(conn);
conn->writer.state = HRPC_CONN_WRITE_CLOSED;
free(conn->protocol);
+ free(conn->username);
uv_close((uv_handle_t*)&conn->stream, conn_free);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/conn.h Thu May
15 20:08:36 2014
@@ -134,6 +134,11 @@ struct hrpc_conn {
char *protocol;
/**
+ * The Hadoop username we're connected as. Malloc'ed.
+ */
+ char *username;
+
+ /**
* The client ID we used when establishing the connection.
*/
struct hrpc_client_id client_id;
@@ -143,6 +148,11 @@ struct hrpc_conn {
*/
uv_connect_t conn_req;
+ /**
+ * The current call ID.
+ */
+ uint32_t call_id;
+
struct hrpc_conn_writer writer;
struct hrpc_conn_reader reader;
};
@@ -198,11 +208,13 @@ int hrpc_conn_compare(const struct hrpc_
* @param conn The connection.
* @param addr The address.
* @param protocol The protocol.
+ * @param username The username.
*
* @return 1 if the connection is usable; 0 if not.
*/
int hrpc_conn_usable(const struct hrpc_conn *conn,
- const struct sockaddr_in *addr, const char *protocol);
+ const struct sockaddr_in *addr,
+ const char *protocol, const char *username);
/**
* Destroy a connection.
Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c
(original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c Thu May
15 20:08:36 2014
@@ -24,6 +24,7 @@
#include "rpc/varint.h"
#include <errno.h>
+#include <inttypes.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
@@ -43,131 +44,22 @@
*/
#define MAX_SEND_LEN (63 * 1024 * 1024)
-struct hrpc_proxy {
- /**
- * The messenger that this proxy is associated with.
- */
- struct hrpc_messenger *msgr;
-
- /**
- * Dynamically allocated string describing the protocol this proxy speaks.
- */
- char *protocol;
-
- /**
- * The current call.
- */
- struct hrpc_call call;
-
- /**
- * A memory area which can be used by the current call.
- *
- * This will be null if userdata_len is 0.
- */
- uint8_t *userdata;
-
- /**
- * Length of userdata.
- */
- size_t userdata_len;
-};
-
struct hrpc_proxy_builder {
struct hrpc_proxy *proxy;
};
static const char OOM_ERROR[] = "OOM";
-struct hrpc_proxy_builder *hrpc_proxy_builder_alloc(
- struct hrpc_messenger *msgr)
-{
- struct hrpc_proxy_builder *bld;
-
- bld = calloc(1, sizeof(struct hrpc_proxy_builder));
- if (!bld)
- return NULL;
- bld->proxy = calloc(1, sizeof(struct hrpc_proxy));
- if (!bld->proxy) {
- free(bld);
- return NULL;
- }
- bld->proxy->msgr = msgr;
- bld->proxy->call.remote.sin_addr.s_addr = INADDR_ANY;
-
- return bld;
-}
-
-void hrpc_proxy_builder_free(struct hrpc_proxy_builder *bld)
-{
- if (!bld)
- return;
- free(bld->proxy);
- free(bld);
-}
-
-void hrpc_proxy_builder_set_protocol(struct hrpc_proxy_builder *bld,
- const char *protocol)
-{
- struct hrpc_proxy *proxy = bld->proxy;
-
- if (proxy->protocol) {
- if (proxy->protocol != OOM_ERROR) {
- free(proxy->protocol);
- }
- proxy->protocol = NULL;
- }
- proxy->protocol = strdup(protocol);
- if (!proxy->protocol) {
- proxy->protocol = (char*)OOM_ERROR;
- }
-}
-
-void hrpc_proxy_builder_set_remote(struct hrpc_proxy_builder *bld,
- const struct sockaddr_in *remote)
-{
- bld->proxy->call.remote = *remote;
-}
-
-struct hadoop_err *hrpc_proxy_create(struct hrpc_proxy_builder *bld,
- struct hrpc_proxy **out)
-{
- struct hrpc_proxy *proxy;
-
- proxy = bld->proxy;
- free(bld);
- //fprintf(stderr, "proxy = %p, proxy->protocol = %s, proxy->call.cb =
%p\n", proxy, proxy->protocol, proxy->call.cb);
- if (proxy->call.remote.sin_addr.s_addr == INADDR_ANY) {
- hrpc_proxy_free(proxy);
- return hadoop_lerr_alloc(EINVAL, "hrpc_proxy_create: you must specify "
- "a remote.");
- }
- if (!proxy->protocol) {
- hrpc_proxy_free(proxy);
- return hadoop_lerr_alloc(EINVAL, "hrpc_proxy_create: can't create "
- "a proxy without a protocol argument.");
- } else if (proxy->protocol == OOM_ERROR) {
- // There was an OOM error during hrpc_proxy_builder_set_protocol.
- hrpc_proxy_free(proxy);
- return hadoop_lerr_alloc(ENOMEM, "hrpc_proxy_create: OOM error.");
- }
- *out = proxy;
- return NULL;
-}
-
-void hrpc_proxy_free(struct hrpc_proxy *proxy)
-{
- if (!proxy)
- return;
- if (hrpc_call_is_active(&proxy->call)) {
- proxy_log_warn(proxy, "%s", "hrpc_proxy_free: attempt to free a proxy "
- "which is currently active!\n");
- return;
- }
- if (proxy->protocol != OOM_ERROR) {
- free(proxy->protocol);
- }
- free(proxy->userdata);
- free(proxy);
+void hrpc_proxy_init(struct hrpc_proxy *proxy,
+ struct hrpc_messenger *msgr,
+ const struct sockaddr_in *remote,
+ const char *protocol, const char *username)
+{
+ memset(proxy, 0, sizeof(*proxy));
+ proxy->msgr = msgr;
+ proxy->protocol = protocol;
+ proxy->username = username;
+ proxy->call.remote = *remote;
}
struct hadoop_err *hrpc_proxy_activate(struct hrpc_proxy *proxy)
@@ -191,13 +83,8 @@ void hrpc_proxy_deactivate(struct hrpc_p
void *hrpc_proxy_alloc_userdata(struct hrpc_proxy *proxy, size_t size)
{
- if (size > proxy->userdata_len) {
- uint8_t *new_userdata = realloc(proxy->userdata, size);
- if (!new_userdata) {
- return NULL;
- }
- proxy->userdata = new_userdata;
- proxy->userdata_len = size;
+ if (size > RPC_PROXY_USERDATA_MAX) {
+ return NULL;
}
return proxy->userdata;
}
@@ -244,15 +131,11 @@ void hrpc_proxy_start(struct hrpc_proxy
call->cb = cb;
call->cb_data = cb_data;
- call->protocol = strdup(proxy->protocol);
- if (!call->protocol) {
- hrpc_call_deliver_err(call, hadoop_lerr_alloc(ENOMEM,
- "hrpc_proxy_start_internal: out of memory"));
- return;
- }
+ call->protocol = proxy->protocol;
+ call->username = proxy->username;
req_header.methodname = (char*)method;
- req_header.declaringclassprotocolname = proxy->protocol;
+ req_header.declaringclassprotocolname = (char*)proxy->protocol;
req_header.clientprotocolversion = 1;
req_header_len = request_header_proto__get_packed_size(&req_header);
buf_len = varint32_size(req_header_len);
@@ -262,7 +145,7 @@ void hrpc_proxy_start(struct hrpc_proxy
if (buf_len >= MAX_SEND_LEN) {
hrpc_call_deliver_err(call,
hadoop_lerr_alloc(EINVAL, "hrpc_proxy_setup_header: the "
- "request length is too long at %"PRId64 " bytes. The "
+ "request length is too long at %" PRId64 " bytes. The "
"maximum we will send is %d bytes.", buf_len, MAX_SEND_LEN));
return;
}
@@ -270,7 +153,7 @@ void hrpc_proxy_start(struct hrpc_proxy
if (!buf) {
hrpc_call_deliver_err(call,
hadoop_lerr_alloc(ENOMEM, "hrpc_proxy_setup_header: "
- "failed to allocate a buffer of length %"PRId64" bytes.",
+ "failed to allocate a buffer of length %" PRId64 " bytes.",
buf_len));
return;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h
(original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h Thu May
15 20:08:36 2014
@@ -19,13 +19,13 @@
#ifndef HADOOP_CORE_RPC_PROXY_H
#define HADOOP_CORE_RPC_PROXY_H
+#include "rpc/call.h"
+
#include <stdint.h> /* for uint8_t */
#include <uv.h> /* for uv_buf_t */
struct hadoop_err;
struct hrpc_messenger;
-struct hrpc_proxy;
-struct hrpc_proxy_builder;
struct hrpc_response {
uint8_t *pb_base;
@@ -39,65 +39,58 @@ struct hrpc_sync_ctx {
struct hrpc_response resp;
};
-typedef void (*hrpc_raw_cb_t)(struct hrpc_response *,
- struct hadoop_err *, void *);
-
typedef size_t (*hrpc_pack_cb_t)(const void *, uint8_t *);
-/**
- * Allocate a Hadoop proxy builder.
- *
- * @param msgr The Hadoop messenger that this proxy will be associated
- * with.
- * @return A Hadoop proxy builder, or NULL on OOM.
- */
-struct hrpc_proxy_builder *hrpc_proxy_builder_alloc(
- struct hrpc_messenger *msgr);
-
-/**
- * Free a Hadoop proxy builder.
- *
- * @param bld The Hadoop proxy builder to free.
- */
-void hrpc_proxy_builder_free(struct hrpc_proxy_builder *bld);
-
-/**
- * Set the protocol used by a proxy.
- *
- * @param bld The Hadoop proxy builder.
- * @param proto The protocol string to use. Will be deep-copied.
- */
-void hrpc_proxy_builder_set_protocol(struct hrpc_proxy_builder *bld,
- const char *proto);
-
-/**
- * Set the remote that the proxy should connect to.
- *
- * @param bld The Hadoop proxy builder.
- * @param remote The remote. Will be deep-copied.
- */
-void hrpc_proxy_builder_set_remote(struct hrpc_proxy_builder *bld,
- const struct sockaddr_in *remote);
+#define RPC_PROXY_USERDATA_MAX 64
-/**
- * Create a Hadoop proxy
- *
- * @param bld The Hadoop proxy builder to use.
- * The builder will be freed, even on failure.
- * @param out (out param) On success, the Hadoop proxy.
- *
- * @return On success, NULL. On error, the error message.
- */
-struct hadoop_err *hrpc_proxy_create(struct hrpc_proxy_builder *bld,
- struct hrpc_proxy **out);
+struct hrpc_proxy {
+ /**
+ * The messenger that this proxy is associated with.
+ */
+ struct hrpc_messenger *msgr;
+
+ /**
+ * String describing the protocol this proxy speaks.
+ */
+ const char *protocol;
+
+ /**
+ * String describing the username this proxy uses.
+ */
+ const char *username;
+
+ /**
+ * The current call.
+ */
+ struct hrpc_call call;
+
+ /**
+ * A memory area which can be used by the current call.
+ *
+ * This will be null if userdata_len is 0.
+ */
+ uint8_t userdata[RPC_PROXY_USERDATA_MAX];
+};
/**
- * Free a Hadoop proxy.
+ * Initialize a Hadoop proxy.
*
- * @param proxy The Hadoop proxy to free. You must not attempt to free a
- * proxy with a call in progress.
- */
-void hrpc_proxy_free(struct hrpc_proxy *proxy);
+ * @param proxy The Hadoop proxy to initialize.
+ * @param msgr The messenger to associate the proxy with.
+ * This messenger must not be de-allocated while the proxy
+ * still exists.
+ * @param protocol The protocol to use for this proxy.
+ * This string must remain valid for the lifetime of the
+ * proxy.
+ * @param remote The remote to contact. Will be copied.
+ * @param username The username to use.
+ * This string must remain valid for the lifetime of the
+ * proxy.
+ */
+void hrpc_proxy_init(struct hrpc_proxy *proxy,
+ struct hrpc_messenger *msgr,
+ const struct sockaddr_in *remote,
+ const char *protocol, const char *username);
/**
* Mark the proxy as active.
@@ -160,6 +153,9 @@ void hrpc_proxy_sync_cb(struct hrpc_resp
*
* This method will return after queuing up the RPC to be sent.
*
+ * Note: after the proxy has been started, you may __not__ de-allocate the
+ * proxy until the callback has happened.
+ *
* @param proxy The Hadoop proxy to use. A single proxy can
* only make one call at once.
* @param method The method we're calling.
Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c?rev=1595028&r1=1595027&r2=1595028&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c
(original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c Thu
May 15 20:08:36 2014
@@ -67,11 +67,11 @@ static struct hrpc_conn *reuse_idle_conn
memset(&exemplar, 0, sizeof(exemplar));
exemplar.remote = *remote;
- exemplar.protocol = call->protocol;
+ exemplar.protocol = (char*)call->protocol;
conn = RB_NFIND(hrpc_conns, &reactor->conns, &exemplar);
if (!conn)
return NULL;
- if (hrpc_conn_usable(conn, remote, call->protocol)) {
+ if (hrpc_conn_usable(conn, remote, call->protocol, call->username)) {
if (conn->writer.state == HRPC_CONN_WRITE_IDLE) {
RB_REMOVE(hrpc_conns, &reactor->conns, conn);
return conn;