Repository: incubator-htrace Updated Branches: refs/heads/master b4c968740 -> a3c25b94a
HTRACE-159. libhtrace.so: use HRPC endpoint of htraced (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/a3c25b94 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/a3c25b94 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/a3c25b94 Branch: refs/heads/master Commit: a3c25b94aa5f7c3e04538e4f37ab6dba83b1480d Parents: b4c9687 Author: Colin P. Mccabe <[email protected]> Authored: Thu Apr 30 11:23:52 2015 -0700 Committer: Colin P. Mccabe <[email protected]> Committed: Thu Apr 30 11:28:04 2015 -0700 ---------------------------------------------------------------------- htrace-c/src/CMakeLists.txt | 11 +- htrace-c/src/core/conf.c | 4 +- htrace-c/src/core/htrace.h | 14 +- htrace-c/src/receiver/curl.c | 124 ----- htrace-c/src/receiver/curl.h | 60 --- htrace-c/src/receiver/hrpc.c | 512 +++++++++++++++++++ htrace-c/src/receiver/hrpc.h | 90 ++++ htrace-c/src/receiver/htraced.c | 227 ++++---- htrace-c/src/test/htraced_rcv-unit.c | 2 +- htrace-c/src/test/mini_htraced.c | 17 +- htrace-c/src/test/mini_htraced.h | 5 + htrace-c/src/test/rtest.c | 10 +- htrace-c/src/test/string-unit.c | 45 ++ htrace-c/src/util/string.c | 55 ++ htrace-c/src/util/string.h | 22 + htrace-c/src/util/time.c | 8 + htrace-c/src/util/time.h | 9 + .../go/src/org/apache/htrace/client/client.go | 21 +- .../go/src/org/apache/htrace/client/hclient.go | 12 +- .../go/src/org/apache/htrace/htraced/hrpc.go | 89 +++- 20 files changed, 983 insertions(+), 354 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/htrace-c/src/CMakeLists.txt b/htrace-c/src/CMakeLists.txt index f3465f6..3d51968 100644 --- a/htrace-c/src/CMakeLists.txt +++ b/htrace-c/src/CMakeLists.txt @@ -48,8 +48,6 @@ get_filename_component(HTRACE_ABSPATH "../../htrace-htraced/src/go/build/htrace" get_filename_component(HTRACED_ABSPATH "../../htrace-htraced/src/go/build/htraced" ABSOLUTE) CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/test/test_config.h.cmake ${CMAKE_BINARY_DIR}/test/test_config.h) -find_package(CURL REQUIRED) - find_package(PkgConfig) pkg_check_modules(PC_JSON-C QUIET json-c) find_path(JSON_C_INCLUDE_DIR "json.h" @@ -61,8 +59,7 @@ ELSE(JSON_C_INCLUDE_DIR AND JSON_C_LIBRARY) MESSAGE(FATAL_ERROR "Failed to find libjson-c. Try installing libjson-c with apt-get or yum, or install it manually from http://oss.metaparadigm.com/json-c/") ENDIF(JSON_C_INCLUDE_DIR AND JSON_C_LIBRARY) -include_directories(${CURL_INCLUDE_DIR} - ${CMAKE_BINARY_DIR} +include_directories(${CMAKE_BINARY_DIR} ${CMAKE_SOURCE_DIR}) if (${CMAKE_SYSTEM_NAME} STREQUAL "Linux") @@ -77,7 +74,7 @@ set(SRC_ALL core/htracer.c core/scope.c core/span.c - receiver/curl.c + receiver/hrpc.c receiver/htraced.c receiver/local_file.c receiver/noop.c @@ -94,9 +91,7 @@ set(SRC_ALL util/time.c ) -set(DEPS_ALL - ${CURL_LIBRARY} - pthread) +set(DEPS_ALL pthread) # The unit test version of the library, which exposes all symbols. add_library(htrace_test STATIC http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/core/conf.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/core/conf.c b/htrace-c/src/core/conf.c index 936e224..a561906 100644 --- a/htrace-c/src/core/conf.c +++ b/htrace-c/src/core/conf.c @@ -29,7 +29,9 @@ #define HTRACE_DEFAULT_CONF_KEYS (\ HTRACE_PROB_SAMPLER_FRACTION_KEY "=0.01"\ ";" HTRACED_BUFFER_SIZE_KEY "=67108864"\ - ";" HTRACED_SEND_TIMEOUT_MS_KEY "=120000"\ + ";" HTRACED_FLUSH_INTERVAL_MS_KEY "=120000"\ + ";" HTRACED_WRITE_TIMEO_MS_KEY "=60000"\ + ";" HTRACED_READ_TIMEO_MS_KEY "=60000"\ ";" HTRACE_PROCESS_ID "=%{tname}/%{ip}"\ ";" HTRACED_ADDRESS_KEY "=localhost:9095"\ ) http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/core/htrace.h ---------------------------------------------------------------------- diff --git a/htrace-c/src/core/htrace.h b/htrace-c/src/core/htrace.h index cdebd31..4e2f1f9 100644 --- a/htrace-c/src/core/htrace.h +++ b/htrace-c/src/core/htrace.h @@ -115,9 +115,19 @@ extern "C" { #define HTRACED_ADDRESS_KEY "htraced.address" /** - * The timeout to use when sending spans to the htraced server. + * The maximum length of time to go before flushing spans to the htraced server. */ -#define HTRACED_SEND_TIMEOUT_MS_KEY "htraced.send.timeout.ms" +#define HTRACED_FLUSH_INTERVAL_MS_KEY "htraced.flush.interval.ms" + +/** + * The TCP write timeout to use when communicating with the htraced server. + */ +#define HTRACED_WRITE_TIMEO_MS_KEY "htraced.write.timeo.ms" + +/** + * The TCP read timeout to use when communicating with the htraced server. + */ +#define HTRACED_READ_TIMEO_MS_KEY "htraced.read.timeo.ms" /** * The size of the circular buffer to use in the htraced receiver. http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/curl.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/receiver/curl.c b/htrace-c/src/receiver/curl.c deleted file mode 100644 index 0905dd4..0000000 --- a/htrace-c/src/receiver/curl.c +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "core/conf.h" -#include "util/log.h" - -#include <curl/curl.h> -#include <pthread.h> -#include <stdint.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -/** - * Unfortunately, libcurl requires a non-threadsafe initialization function to - * be called before it is usable. This is unfortunate for a library like - * libhtrace, which is designed to be used in a multi-threaded context. - * - * This mutex protects us against an application creating two htraced receivers - * at around the same time, and calling that non-threadsafe initialization - * function. - * - * Of course, this doesn't protect us against the application also initializing - * libcurl. We can protect against that by statically linking a private copy of - * libcurl into libhtrace, so that we will be initializing and using our own - * private copy of libcurl rather than the application's. - */ -static pthread_mutex_t g_curl_refcnt_lock = PTHREAD_MUTEX_INITIALIZER; - -/** - * The current number of CURL handles that are open. - */ -static int64_t g_curl_refcnt; - -static int curl_addref(struct htrace_log *lg) -{ - int success = 0; - CURLcode curl_err = 0; - - pthread_mutex_lock(&g_curl_refcnt_lock); - if (g_curl_refcnt >= 1) { - g_curl_refcnt++; - success = 1; - goto done; - } - curl_err = curl_global_init(CURL_GLOBAL_ALL); - if (curl_err) { - htrace_log(lg, "curl_global_init failed: error %d (%s)\n", - curl_err, curl_easy_strerror(curl_err)); - goto done; - } - htrace_log(lg, "successfully initialized libcurl...\n"); - g_curl_refcnt = 1; - success = 1; - -done: - pthread_mutex_unlock(&g_curl_refcnt_lock); - return success; -} - -static void curl_unref(struct htrace_log *lg) -{ - pthread_mutex_lock(&g_curl_refcnt_lock); - g_curl_refcnt--; - if (g_curl_refcnt > 0) { - goto done; - } - curl_global_cleanup(); - htrace_log(lg, "shut down libcurl...\n"); -done: - pthread_mutex_unlock(&g_curl_refcnt_lock); -} - -CURL* htrace_curl_init(struct htrace_log *lg, const struct htrace_conf *conf) -{ - CURL *curl = NULL; - int success = 0; - - if (!curl_addref(lg)) { - return NULL; - } - curl = curl_easy_init(); - if (!curl) { - htrace_log(lg, "curl_easy_init failed.\n"); - goto done; - } - success = 1; - -done: - if (!success) { - if (curl) { - curl_easy_cleanup(curl); - } - curl_unref(lg); - return NULL; - } - return curl; -} - -void htrace_curl_free(struct htrace_log *lg, CURL *curl) -{ - if (!curl) { - return; - } - curl_easy_cleanup(curl); - curl_unref(lg); -} - -// vim:ts=4:sw=4:et http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/curl.h ---------------------------------------------------------------------- diff --git a/htrace-c/src/receiver/curl.h b/htrace-c/src/receiver/curl.h deleted file mode 100644 index 9249cd3..0000000 --- a/htrace-c/src/receiver/curl.h +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef APACHE_HTRACE_RECEIVER_CURL_H -#define APACHE_HTRACE_RECEIVER_CURL_H - -/** - * @file curl.h - * - * Utility functions wrapping libcurl. - * - * This is an internal header, not intended for external use. - */ - -#include <curl/curl.h> // for the CURL type - -struct htrace_conf; -struct htrace_log; - -/** - * Initialize a libcurl handle. - * - * This function also takes care of calling curl_global_init if necessary. - * - * @param lg The HTrace log to use for error messages. - * @param conf The HTrace configuration to use. - * - * @return A libcurl handle, or NULL on failure. - */ -CURL* htrace_curl_init(struct htrace_log *lg, const struct htrace_conf *conf); - -/** - * Free a libcurl handle. - * - * This function also takes care of calling curl_global_cleanup if necessary. - * - * @param lg The HTrace log to use for error messages. - * - * @param curl The libcurl handle. - */ -void htrace_curl_free(struct htrace_log *lg, CURL *curl); - -#endif - -// vim: ts=4: sw=4: et http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/hrpc.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/receiver/hrpc.c b/htrace-c/src/receiver/hrpc.c new file mode 100644 index 0000000..32a1f4a --- /dev/null +++ b/htrace-c/src/receiver/hrpc.c @@ -0,0 +1,512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "receiver/hrpc.h" +#include "util/log.h" +#include "util/string.h" +#include "util/time.h" + +#include <errno.h> +#include <fcntl.h> +#include <inttypes.h> +#include <netdb.h> +#include <netinet/in.h> +#include <pthread.h> +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +#if defined(__OpenBSD__) +#include <sys/types.h> +#define be16toh(x) betoh16(x) +#define be32toh(x) betoh32(x) +#define be64toh(x) betoh64(x) +#elif defined(__NetBSD__) || defined(__FreeBSD__) +#include <sys/endian.h> +#else +#include <endian.h> +#endif + +/** + * @file hrpc.c + * + * Implements sending messages via HRPC. + */ + +#define HRPC_MAGIC 0x48545243U + +#define MAX_HRPC_ERROR_LENGTH (4 * 1024 * 1024) + +#define MAX_HRPC_BODY_LENGTH (64 * 1024 * 1024) + +#define DEFAULT_HTRACED_HRPC_PORT 9075 + +#define ADDR_STR_MAX (2 + INET6_ADDRSTRLEN + sizeof(":65536")) + +struct hrpc_client { + /** + * The HTrace log object. + */ + struct htrace_log *lg; + + /** + * The tcp write timeout in milliseconds. + */ + uint64_t write_timeo_ms; + + /** + * The tcp read timeout in milliseconds. + */ + uint64_t read_timeo_ms; + + /** + * The hostname or IP address. Malloced. + */ + char *host; + + /** + * The port. + */ + int port; + + /** + * The host:port string. Malloced. + */ + char *endpoint; + + /** + * Socket of current open connection, or -1 if there is no currently open + * connection. + */ + int sock; + + /** + * The sequence number on the connection. + */ + uint64_t seq; + + /** + * The remote IP address. + */ + char addr_str[ADDR_STR_MAX]; +}; + +struct hrpc_req_header { + uint32_t magic; + uint32_t method_id; + uint64_t seq; + uint32_t length; +} __attribute__((packed,aligned(4))); + +struct hrpc_resp_header { + uint64_t seq; + uint32_t method_id; + uint32_t err_length; + uint32_t length; +} __attribute__((packed,aligned(4))); + + +static int hrpc_client_open_conn(struct hrpc_client *hcli); +static int try_connect(struct hrpc_client *hcli, struct addrinfo *p); +static int set_socket_read_and_write_timeout(struct hrpc_client *hcli, + int sock); +static int hrpc_client_send_req(struct hrpc_client *hcli, uint32_t method_id, + const void *req, size_t req_len, uint64_t *seq); +static int hrpc_client_rcv_resp(struct hrpc_client *hcli, uint32_t method_id, + uint64_t seq, char **err, void **resp, + size_t *resp_len); + +struct hrpc_client *hrpc_client_alloc(struct htrace_log *lg, + uint64_t write_timeo_ms, uint64_t read_timeo_ms, + const char *endpoint) +{ + struct hrpc_client *hcli; + + hcli = calloc(1, sizeof(*hcli)); + if (!hcli) { + htrace_log(lg, "Failed to allocate memory for the HRPC client.\n"); + goto error; + } + hcli->lg = lg; + hcli->write_timeo_ms = write_timeo_ms; + hcli->read_timeo_ms = read_timeo_ms; + hcli->sock = -1; + hcli->endpoint = strdup(endpoint); + if (!hcli->endpoint) { + htrace_log(lg, "Failed to allocate memory for the endpoint string.\n"); + goto error; + } + if (!parse_endpoint(lg, endpoint, DEFAULT_HTRACED_HRPC_PORT, + &hcli->host, &hcli->port)) { + goto error; + } + return hcli; + +error: + if (hcli) { + free(hcli->host); + free(hcli->endpoint); + free(hcli); + } + return NULL; +} + +void hrpc_client_free(struct hrpc_client *hcli) +{ + if (!hcli) { + return; + } + if (hcli->sock >= 0) { + close(hcli->sock); + hcli->sock = -1; + } + free(hcli->host); + free(hcli->endpoint); + free(hcli); +} + +int hrpc_client_call(struct hrpc_client *hcli, uint32_t method_id, + const void *req, size_t req_len, + char **err, void **resp, size_t *resp_len) +{ + uint64_t seq; + + if (hcli->sock < 0) { + if (!hrpc_client_open_conn(hcli)) { + goto error; + } + htrace_log(hcli->lg, "hrpc_client_call: successfully opened connection\n"); + } else { + htrace_log(hcli->lg, "hrpc_client_call: connection was already open\n"); + } + if (!hrpc_client_send_req(hcli, method_id, req, req_len, &seq)) { + goto error; + } + htrace_log(hcli->lg, "hrpc_client_call: waiting for response\n"); + if (!hrpc_client_rcv_resp(hcli, method_id, seq, err, resp, resp_len)) { + goto error; + } + return 1; + +error: + if (hcli->sock >= 0) { + close(hcli->sock); + hcli->sock = -1; + } + return 0; +} + +static int hrpc_client_open_conn(struct hrpc_client *hcli) +{ + int res, sock = -1; + struct addrinfo hints, *list, *info; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + res = getaddrinfo(hcli->host, NULL, &hints, &list); + if (res) { + htrace_log(hcli->lg, "hrpc_client_open_conn: " + "getaddrinfo(%s) error %d: %s\n", + hcli->host, res, gai_strerror(res)); + return 0; + } + for (info = list; info; info = info->ai_next) { + sock = try_connect(hcli, info); + if (sock >= 0) { + break; + } + } + freeaddrinfo(list); + if (!info) { + htrace_log(hcli->lg, "hrpc_client_open_conn(%s): failed to connect.\n", + hcli->host); + return 0; + } + hcli->sock = sock; + return 1; +} + +static int set_port(struct hrpc_client *hcli, struct sockaddr *addr, + int ai_family) +{ + switch (ai_family) { + case AF_INET: { + struct sockaddr_in *in4 = (struct sockaddr_in*)addr; + in4->sin_port = htons(hcli->port); + return 1; + } + case AF_INET6: { + struct sockaddr_in6 *in6 = (struct sockaddr_in6*)addr; + in6->sin6_port = htons(hcli->port); + return 1; + } + default: + htrace_log(hcli->lg, "try_connect(%s): set_port %d failed: unknown " + "ai_family %d\n", hcli->addr_str, hcli->port, ai_family); + return 0; + } +} + +static int try_connect(struct hrpc_client *hcli, struct addrinfo *p) +{ + int e, sock = -1; + char ip[INET6_ADDRSTRLEN]; + + e = getnameinfo(p->ai_addr, p->ai_addrlen, + ip, sizeof(ip), 0, 0, NI_NUMERICHOST); + if (e) { + htrace_log(hcli->lg, "try_connect: getnameinfo failed. error " + "%d: %s\n", e, gai_strerror(e)); + return 0; + } + snprintf(hcli->addr_str, ADDR_STR_MAX, "%s:%d", ip, hcli->port); + if (!set_port(hcli, p->ai_addr, p->ai_family)) { + goto error; + } + sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + if (sock < 0) { + e = errno; + htrace_log(hcli->lg, "try_connect(%s): failed to create new " + "socket: error %d (%s)\n", hcli->addr_str, e, terror(e)); + goto error; + } + if (fcntl(sock, F_SETFD, FD_CLOEXEC) < 0) { + e = errno; + htrace_log(hcli->lg, "try_connect(%s): fcntl(FD_CLOEXEC) " + "failed: error %d (%s)\n", hcli->addr_str, e, terror(e)); + goto error; + } + if (!set_socket_read_and_write_timeout(hcli, sock)) { + goto error; + } + if (connect(sock, p->ai_addr, p->ai_addrlen) < 0) { + e = errno; + htrace_log(hcli->lg, "try_connect(%s): connect " + "failed: error %d (%s)\n", hcli->addr_str, e, terror(e)); + goto error; + } + return sock; + +error: + if (sock >= 0) { + close(sock); + } + return -1; +} + +static int set_socket_read_and_write_timeout(struct hrpc_client *hcli, + int sock) +{ + struct timeval tv; + + ms_to_timeval(hcli->read_timeo_ms, &tv); + if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { + int e = errno; + htrace_log(hcli->lg, "setsockopt(%d, SO_RCVTIMEO, %"PRId64") failed: " + "error %d (%s)\n", sock, hcli->read_timeo_ms, e, terror(e)); + return 0; + } + + ms_to_timeval(hcli->write_timeo_ms, &tv); + if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) { + int e = errno; + htrace_log(hcli->lg, "setsockopt(%d, SO_SNDTIMEO, %"PRId64") failed: " + "error %d (%s)\n", sock, hcli->write_timeo_ms, e, terror(e)); + return 0; + } + return 1; +} + +static int hrpc_client_send_req(struct hrpc_client *hcli, uint32_t method_id, + const void *req, size_t req_len, uint64_t *seq) +{ + struct hrpc_req_header hdr; + struct iovec iov[2]; + + hdr.magic = htole64(HRPC_MAGIC); + hdr.method_id = htole32(method_id); + *seq = hcli->seq++; + hdr.seq = htole64(*seq); + hdr.length = htole32(req_len); + iov[0].iov_base = &hdr; + iov[0].iov_len = sizeof(hdr); + iov[1].iov_base = (void*)req; + iov[1].iov_len = req_len; + + while (1) { + ssize_t res = writev(hcli->sock, iov, sizeof(iov)/sizeof(iov[0])); + int i; + if (res < 0) { + int e = errno; + if (e == EINTR) { + continue; + } + htrace_log(hcli->lg, "hrpc_client_send_req: writev error: " + "error %d: %s\n", e, terror(e)); + return 0; + } + i = 0; + while (res > 0) { + if (iov[i].iov_len < res) { + res -= iov[i].iov_len; + iov[i].iov_len = 0; + } else { + iov[i].iov_len -= res; + res = 0; + } + if (++i >= (sizeof(iov)/sizeof(iov[0]))) { + if (res == 0) { + return 1; + } + htrace_log(hcli->lg, "hrpc_client_send_req: unexpectedly " + "large writev return.\n"); + return 0; + } + } + } +} + +static int safe_read(int fd, void *buf, size_t amt) +{ + uint8_t *b = buf; + int e, res, nread = 0; + + while (1) { + res = read(fd, b + nread, amt - nread); + if (res <= 0) { + if (res == 0) { + return nread; + } + e = errno; + if (e == EINTR) { + continue; + } + return -e; + } + nread += res; + if (nread >= amt) { + return nread; + } + } +} + +static int hrpc_client_rcv_resp(struct hrpc_client *hcli, uint32_t method_id, + uint64_t seq, char **err_out, void **resp_out, + size_t *resp_len) +{ + int res; + struct hrpc_resp_header hdr; + uint64_t resp_seq; + uint32_t resp_method_id, err_length, length; + char *err = NULL, *resp = NULL; + + res = safe_read(hcli->sock, &hdr, sizeof(hdr)); + if (res < 0) { + htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error reading " + "response header: %d (%s)\n", hcli->addr_str, -res, + terror(-res)); + goto error; + } + if (res != sizeof(hdr)) { + htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): unexpected EOF " + "reading response header.\n", hcli->addr_str); + goto error; + } + resp_seq = le64toh(hdr.seq); + if (resp_seq != seq) { + htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): expected sequence " + "ID 0x%"PRIx64", but got sequence ID 0x%"PRId64".\n", + hcli->addr_str, seq, resp_seq); + goto error; + } + resp_method_id = le32toh(hdr.method_id); + if (resp_method_id != method_id) { + htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): expected method " + "ID 0x%"PRIx32", but got method ID 0x%"PRId32".\n", + hcli->addr_str, method_id, resp_method_id); + goto error; + } + err_length = le32toh(hdr.err_length); + if (err_length > MAX_HRPC_ERROR_LENGTH) { + htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error length was " + "%"PRId32", but the maximum error length is %"PRId32".", + hcli->addr_str, err_length, MAX_HRPC_ERROR_LENGTH); + goto error; + } + if (err_length > 0) { + err = malloc(err_length + 1); + res = safe_read(hcli->sock, err, err_length); + if (res < 0) { + htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error reading " + "error string: %d (%s)\n", hcli->addr_str, -res, + terror(-res)); + goto error; + } + if (res != err_length) { + htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): unexpected EOF " + "reading error string.\n", hcli->addr_str); + goto error; + } + err[err_length] = '\0'; + } + length = le32toh(hdr.length); + if (length > MAX_HRPC_BODY_LENGTH) { + htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): body length was " + "%"PRId32", but the maximum body length is %"PRId32".", + hcli->addr_str, length, MAX_HRPC_BODY_LENGTH); + goto error; + } + if (length > 0) { + resp = malloc(length); + res = safe_read(hcli->sock, resp, length); + if (res < 0) { + htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error reading " + "body: %d (%s)\n", hcli->addr_str, -res, terror(-res)); + goto error; + } + if (res != length) { + htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): unexpected EOF " + "reading body.\n", hcli->addr_str); + goto error; + } + } + *err_out = err; + *resp_out = resp; + *resp_len = length; + return 1; + +error: + free(err); + free(resp); + *err_out = NULL; + *resp_out = NULL; + *resp_len = 0; + return 0; +} + +const char *hrpc_client_get_endpoint(struct hrpc_client *hcli) +{ + return hcli->endpoint; +} + +// vim:ts=4:sw=4:et http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/hrpc.h ---------------------------------------------------------------------- diff --git a/htrace-c/src/receiver/hrpc.h b/htrace-c/src/receiver/hrpc.h new file mode 100644 index 0000000..8ec20be --- /dev/null +++ b/htrace-c/src/receiver/hrpc.h @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef APACHE_HTRACE_RECEIVER_HRPC +#define APACHE_HTRACE_RECEIVER_HRPC + +/** + * @file hrpc.h + * + * Functions related to HRPC. + * + * This is an internal header, not intended for external use. + */ + +#include <stdint.h> +#include <unistd.h> + +#define METHOD_ID_WRITE_SPANS 0x1 + +struct htrace_log; + +/** + * Create an HRPC client. + * + * @param lg The log object to use for the HRPC client. + * @param write_timeo_ms The TCP write timeout to use. + * @param read_timeo_ms The TCP read timeout to use. + * @param hostpost The hostname and port, separated by a colon. + * + * @param NULL on OOM; the hrpc_client otherwise. + */ +struct hrpc_client *hrpc_client_alloc(struct htrace_log *lg, + uint64_t write_timeo_ms, uint64_t read_timeo_ms, + const char *endpoint); + +/** + * Free the HRPC client. + * + * @param hcli The HRPC client. + */ +void hrpc_client_free(struct hrpc_client *hcli); + +/** + * Make a blocking call using the HRPC client. + * + * @param hcli The HRPC client. + * @param method_id The method ID to use. + * @param req The request buffer to send. + * @param req_len The size of the request buffer to send. + * @param err (out param) Will be set to a malloced + * NULL-terminated string if the server returned an + * error response. NULL otherwise. + * @param resp (out param) The response body. Will be set to the + * response body if the function returns nonzero. + * @param resp_len (out param) The length of the response body. + * + * @return 0 on failure, 1 on success. + */ +int hrpc_client_call(struct hrpc_client *hcli, uint32_t method_id, + const void *req, size_t req_len, + char **err, void **resp, size_t *resp_len); + +/** + * Get the endpoint for this HRPC client. + * + * @param hcli The HRPC client. + * + * @return The endpoint. This string will be valid for the + * lifetime of the HRPC client. + */ +const char *hrpc_client_get_endpoint(struct hrpc_client *hcli); + +#endif + +// vim: ts=4: sw=4: et http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/htraced.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/receiver/htraced.c b/htrace-c/src/receiver/htraced.c index ea61541..7e4af81 100644 --- a/htrace-c/src/receiver/htraced.c +++ b/htrace-c/src/receiver/htraced.c @@ -20,14 +20,14 @@ #include "core/htrace.h" #include "core/htracer.h" #include "core/span.h" -#include "receiver/curl.h" +#include "receiver/hrpc.h" #include "receiver/receiver.h" #include "test/test.h" #include "util/log.h" +#include "util/string.h" #include "util/time.h" #include <errno.h> -#include <curl/curl.h> #include <inttypes.h> #include <pthread.h> #include <stdint.h> @@ -56,15 +56,25 @@ #define HTRACED_MAX_BUFFER_SIZE 0x7ffffffffffffffLL /** - * The minimum number of milliseconds to allow for send_timeo_ms. + * The minimum number of milliseconds to allow for flush_interval_ms. */ -#define HTRACED_SEND_TIMEO_MS_MIN 30000LL +#define HTRACED_FLUSH_INTERVAL_MS_MIN 30000LL /** - * The maximum number of milliseconds to allow for send_timeo_ms. + * The maximum number of milliseconds to allow for flush_interval_ms. * This is mainly to avoid overflow. */ -#define HTRACED_SEND_TIMEO_MS_MAX 86400000LL +#define HTRACED_FLUSH_INTERVAL_MS_MAX 86400000LL + +/** + * The minimum number of milliseconds to allow for tcp write timeouts. + */ +#define HTRACED_WRITE_TIMEO_MS_MIN 50LL + +/** + * The minimum number of milliseconds to allow for tcp read timeouts. + */ +#define HTRACED_READ_TIMEO_MS_MIN 50LL /** * The maximum size of the message we will send over the wire. @@ -108,16 +118,10 @@ struct htraced_rcv { struct htracer *tracer; /** - * The HTraced server URL. - * Dynamically allocated. - */ - char *url; - - /** * Buffered span data becomes eligible to be sent even if there isn't much * in the buffer after this timeout elapses. */ - uint64_t send_timeo_ms; + uint64_t flush_interval_ms; /** * The maximum number of bytes we will buffer before waking the sending @@ -127,9 +131,9 @@ struct htraced_rcv { uint64_t send_threshold; /** - * The CURL handle. + * The HRPC client. */ - CURL *curl; + struct hrpc_client *hcli; /** * Length of the circular buffer. @@ -184,15 +188,35 @@ static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now); static uint64_t cbuf_used(const struct htraced_rcv *rcv); static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv); +static uint64_t htraced_get_bounded_u64(struct htrace_log *lg, + const struct htrace_conf *cnf, const char *prop, + uint64_t min, uint64_t max) +{ + uint64_t val = htrace_conf_get_u64(lg, cnf, prop); + if (val < min) { + htrace_log(lg, "htraced_rcv_create: can't set %s to %"PRId64 + ". Using minimum value of %"PRId64 " instead.\n", + prop, val, min); + return min; + } else if (val > max) { + htrace_log(lg, "htraced_rcv_create: can't set %s to %"PRId64 + ". Using maximum value of %"PRId64 " instead.\n", + prop, val, max); + return max; + } + return val; +} + static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer, const struct htrace_conf *conf) { struct htraced_rcv *rcv; - const char *url; + const char *endpoint; int ret; + uint64_t write_timeo_ms, read_timeo_ms; - url = htrace_conf_get(conf, HTRACED_ADDRESS_KEY); - if (!url) { + endpoint = htrace_conf_get(conf, HTRACED_ADDRESS_KEY); + if (!endpoint) { htrace_log(tracer->lg, "htraced_rcv_create: no value found for %s. " "You must set this configuration key to the " "hostname:port identifying the htraced server.\n", @@ -208,27 +232,21 @@ static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer, rcv->base.ty = &g_htraced_rcv_ty; rcv->shutdown = 0; rcv->tracer = tracer; - if (asprintf(&rcv->url, "%s/writeSpans", url) < 0) { - rcv->url = NULL; + + rcv->flush_interval_ms = htraced_get_bounded_u64(tracer->lg, conf, + HTRACED_FLUSH_INTERVAL_MS_KEY, HTRACED_FLUSH_INTERVAL_MS_MIN, + HTRACED_FLUSH_INTERVAL_MS_MAX); + write_timeo_ms = htraced_get_bounded_u64(tracer->lg, conf, + HTRACED_WRITE_TIMEO_MS_KEY, HTRACED_WRITE_TIMEO_MS_MIN, + 0x7fffffffffffffffULL); + read_timeo_ms = htraced_get_bounded_u64(tracer->lg, conf, + HTRACED_READ_TIMEO_MS_KEY, HTRACED_READ_TIMEO_MS_MIN, + 0x7fffffffffffffffULL); + rcv->hcli = hrpc_client_alloc(tracer->lg, write_timeo_ms, + read_timeo_ms, endpoint); + if (!rcv->hcli) { goto error_free_rcv; } - rcv->send_timeo_ms = htrace_conf_get_u64(tracer->lg, conf, - HTRACED_SEND_TIMEOUT_MS_KEY); - if (rcv->send_timeo_ms < HTRACED_SEND_TIMEO_MS_MIN) { - htrace_log(tracer->lg, "htraced_rcv_create: invalid send timeout of %" - PRId64 " ms. Setting the minimum timeout of %lld" - " ms instead.\n", rcv->send_timeo_ms, HTRACED_SEND_TIMEO_MS_MIN); - rcv->send_timeo_ms = HTRACED_SEND_TIMEO_MS_MIN; - } else if (rcv->send_timeo_ms > HTRACED_SEND_TIMEO_MS_MAX) { - htrace_log(tracer->lg, "htraced_rcv_create: invalid send timeout of %" - PRId64 " ms. Setting the maximum timeout of %lld" - " ms instead.\n", rcv->send_timeo_ms, HTRACED_SEND_TIMEO_MS_MAX); - rcv->send_timeo_ms = HTRACED_SEND_TIMEO_MS_MAX; - } - rcv->curl = htrace_curl_init(tracer->lg, conf); - if (!rcv->curl) { - goto error_free_url; - } rcv->clen = htrace_conf_get_u64(tracer->lg, conf, HTRACED_BUFFER_SIZE_KEY); if (rcv->clen < HTRACED_MIN_BUFFER_SIZE) { htrace_log(tracer->lg, "htraced_rcv_create: invalid buffer size %" PRId64 @@ -245,7 +263,7 @@ static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer, if (!rcv->cbuf) { htrace_log(tracer->lg, "htraced_rcv_create: failed to malloc %"PRId64 " bytes for the htraced circular buffer.\n", rcv->clen); - goto error_free_curl; + goto error_free_hcli; } // Send when the buffer gets 1/4 full. rcv->send_threshold = rcv->clen * 0.25; @@ -274,10 +292,12 @@ static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer, "error %d: %s\n", ret, terror(ret)); goto error_free_cvar; } - htrace_log(tracer->lg, "Initialized htraced receiver with url=%s, " - "send_timeo_ms=%" PRId64 ", send_threshold=%" PRId64 ", clen=%" - PRId64 ".\n", rcv->url, rcv->send_timeo_ms, rcv->send_threshold, - rcv->clen); + htrace_log(tracer->lg, "Initialized htraced receiver for %s" + ", flush_interval_ms=%" PRId64 ", send_threshold=%" PRId64 + ", write_timeo_ms=%" PRId64 ", read_timeo_ms=%" PRId64 + ", clen=%" PRId64 ".\n", hrpc_client_get_endpoint(rcv->hcli), + rcv->flush_interval_ms, rcv->send_threshold, + write_timeo_ms, read_timeo_ms, rcv->clen); return (struct htrace_rcv*)rcv; error_free_cvar: @@ -288,10 +308,8 @@ error_free_sbuf: free(rcv->sbuf); error_free_cbuf: free(rcv->cbuf); -error_free_curl: - htrace_curl_free(tracer->lg, rcv->curl); -error_free_url: - free(rcv->url); +error_free_hcli: + hrpc_client_free(rcv->hcli); error_free_rcv: free(rcv); error: @@ -324,7 +342,7 @@ void* run_htraced_xmit_manager(void *data) // because of send_timeo_ms. // * A writer to signal that we should wake up because enough bytes are // buffered. - wakeup = now + (rcv->send_timeo_ms / 2); + wakeup = now + (rcv->flush_interval_ms / 2); ms_to_timespec(wakeup, &wakeup_ts); ret = pthread_cond_timedwait(&rcv->cond, &rcv->lock, &wakeup_ts); if ((ret != 0) && (ret != ETIMEDOUT)) { @@ -356,7 +374,7 @@ static int should_xmit(struct htraced_rcv *rcv, uint64_t now) // We have buffered a lot of bytes, so let's send. return 1; } - if (now - rcv->last_send_ms > rcv->send_timeo_ms) { + if (now - rcv->last_send_ms > rcv->flush_interval_ms) { // It's been too long since the last transmission, so let's send. if (used > 0) { return 1; @@ -376,55 +394,25 @@ static int should_xmit(struct htraced_rcv *rcv, uint64_t now) static int htraced_xmit_impl(struct htraced_rcv *rcv, int32_t slen) { struct htrace_log *lg = rcv->tracer->lg; - CURLcode res; - char *pid_header = NULL; - struct curl_slist *headers = NULL; - int ret = 0; - - // Disable the use of SIGALARM to interrupt DNS lookups. - curl_easy_setopt(rcv->curl, CURLOPT_NOSIGNAL, 1); - // Do not use a global DNS cache. - curl_easy_setopt(rcv->curl, CURLOPT_DNS_USE_GLOBAL_CACHE, 0); - // Disable verbosity. - curl_easy_setopt(rcv->curl, CURLOPT_VERBOSE, 0); - // The user agent is libhtraced. - curl_easy_setopt(rcv->curl, CURLOPT_USERAGENT, "libhtraced"); - // Set URL - curl_easy_setopt(rcv->curl, CURLOPT_URL, rcv->url); - // Set POST - curl_easy_setopt(rcv->curl, CURLOPT_POST, 1L); - // Set the size that we're copying from rcv->sbuf - curl_easy_setopt(rcv->curl, CURLOPT_POSTFIELDSIZE, (long)slen); - if (asprintf(&pid_header, "htrace-pid: %s", rcv->tracer->prid) < 0) { - htrace_log(lg, "htraced_xmit(%s) failed: OOM allocating htrace-pid\n", - rcv->url); - goto done; - } - curl_easy_setopt(rcv->curl, CURLOPT_POSTFIELDS, rcv->sbuf); - headers = curl_slist_append(headers, pid_header); - if (!headers) { - htrace_log(lg, "htraced_xmit(%s) failed: OOM allocating headers\n", - rcv->url); - return 0; - } - headers = curl_slist_append(headers, "Content-Type: application/json"); - if (!headers) { - htrace_log(lg, "htraced_xmit(%s) failed: OOM allocating headers\n", - rcv->url); - return 0; - } - curl_easy_setopt(rcv->curl, CURLOPT_HTTPHEADER, headers); - res = curl_easy_perform(rcv->curl); - if (res != CURLE_OK) { - htrace_log(lg, "htraced_xmit(%s) failed: error %lld (%s)\n", - rcv->url, (long long)res, curl_easy_strerror(res)); + int res, retval = 0; + char *prequel = NULL, *err = NULL, *resp = NULL; + size_t resp_len = 0; + + res = hrpc_client_call(rcv->hcli, METHOD_ID_WRITE_SPANS, + rcv->sbuf, slen, &err, (void**)&resp, &resp_len); + if (!res) { + htrace_log(lg, "htrace_xmit_impl: hrpc_client_call failed.\n"); + retval = 0; + } else if (err) { + htrace_log(lg, "htrace_xmit_impl: server returned error: %s\n", err); + retval = 0; + } else { + retval = 1; } - ret = res == CURLE_OK; -done: - curl_easy_reset(rcv->curl); - free(pid_header); - curl_slist_free_all(headers); - return ret; + free(prequel); + free(err); + free(resp); + return retval; } static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now) @@ -446,7 +434,7 @@ static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now) tries++; retry = (tries < HTRACED_MAX_SEND_TRIES); htrace_log(rcv->tracer->lg, "htraced_xmit(%s) failed on try %d. %s\n", - rcv->url, tries, + hrpc_client_get_endpoint(rcv->hcli), tries, (retry ? "Retrying after a delay." : "Giving up.")); if (!retry) { break; @@ -471,15 +459,21 @@ static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now) */ static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv) { - int32_t rem = HTRACED_MAX_MSG_LEN; + const char * const SUFFIX = "]}"; + int SUFFIX_LEN = sizeof(SUFFIX) - 1; + int rem = HTRACED_MAX_MSG_LEN - SUFFIX_LEN; size_t amt; + char *sbuf = (char*)rcv->sbuf; + fwdprintf(&sbuf, &rem, "{\"DefaultPid\":\"%s\",\"Spans\":[", + rcv->tracer->prid); if (rcv->cstart < rcv->cend) { amt = rcv->cend - rcv->cstart; if (amt > rem) { amt = rem; } - memcpy(rcv->sbuf, rcv->cbuf + rcv->cstart, amt); + memcpy(sbuf, rcv->cbuf + rcv->cstart, amt); + sbuf += amt; rem -= amt; rcv->cstart += amt; } else { @@ -487,7 +481,8 @@ static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv) if (amt > rem) { amt = rem; } - memcpy(rcv->sbuf, rcv->cbuf + rcv->cstart, amt); + memcpy(sbuf, rcv->cbuf + rcv->cstart, amt); + sbuf += amt; rem -= amt; rcv->cstart += amt; if (rem > 0) { @@ -495,11 +490,17 @@ static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv) if (amt > rem) { amt = rem; } - memcpy(rcv->sbuf, rcv->cbuf, amt); + memcpy(sbuf, rcv->cbuf, amt); + sbuf += amt; rem -= amt; rcv->cstart = amt; } } + // overwrite last comma + rem++; + sbuf--; + rem += SUFFIX_LEN; + fwdprintf(&sbuf, &rem, "%s", SUFFIX); return HTRACED_MAX_MSG_LEN - rem; } @@ -527,11 +528,6 @@ static void htraced_rcv_add_span(struct htrace_rcv *r, struct htraced_rcv *rcv = (struct htraced_rcv *)r; struct htrace_log *lg = rcv->tracer->lg; - { - char buf[4096]; - span_json_sprintf(span, sizeof(buf), buf); - } - json_len = span_json_size(span); tries = 0; do { @@ -559,21 +555,28 @@ static void htraced_rcv_add_span(struct htrace_rcv *r, if (rem < json_len) { // Handle a 'torn write' where the circular buffer loops around to the // beginning in the middle of the write. - char *temp = alloca(json_len); + char *temp = malloc(json_len); + if (!temp) { + htrace_log(lg, "htraced_rcv_add_span: failed to malloc %d byte " + "buffer for torn write.\n", json_len); + goto done; + } span_json_sprintf(span, json_len, temp); - temp[json_len - 1] = '\n'; + temp[json_len - 1] = ','; memcpy(rcv->cbuf + rcv->cend, temp, rem); memcpy(rcv->cbuf, temp + rem, json_len - rem); rcv->cend = json_len - rem; + free(temp); } else { span_json_sprintf(span, json_len, rcv->cbuf + rcv->cend); - rcv->cbuf[rcv->cend + json_len - 1] = '\n'; + rcv->cbuf[rcv->cend + json_len - 1] = ','; rcv->cend += json_len; } used += json_len; if (used > rcv->send_threshold) { pthread_cond_signal(&rcv->cond); } +done: pthread_mutex_unlock(&rcv->lock); } @@ -611,7 +614,8 @@ static void htraced_rcv_free(struct htrace_rcv *r) return; } lg = rcv->tracer->lg; - htrace_log(lg, "Shutting down htraced receiver with url=%s\n", rcv->url); + htrace_log(lg, "Shutting down htraced receiver for %s\n", + hrpc_client_get_endpoint(rcv->hcli)); pthread_mutex_lock(&rcv->lock); rcv->shutdown = 1; pthread_cond_signal(&rcv->cond); @@ -621,10 +625,9 @@ static void htraced_rcv_free(struct htrace_rcv *r) htrace_log(lg, "htraced_rcv_free: pthread_join " "error %d: %s\n", ret, terror(ret)); } - free(rcv->url); free(rcv->cbuf); free(rcv->sbuf); - htrace_curl_free(lg, rcv->curl); + hrpc_client_free(rcv->hcli); ret = pthread_mutex_destroy(&rcv->lock); if (ret) { htrace_log(lg, "htraced_rcv_free: pthread_mutex_destroy " http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/htraced_rcv-unit.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/test/htraced_rcv-unit.c b/htrace-c/src/test/htraced_rcv-unit.c index 29f62d1..d5c83ec 100644 --- a/htrace-c/src/test/htraced_rcv-unit.c +++ b/htrace-c/src/test/htraced_rcv-unit.c @@ -51,7 +51,7 @@ static int htraced_rcv_test(struct rtest *rt) ht->root_dir, "spans.json")); EXPECT_INT_GE(0, asprintf(&conf_str, "%s=%s;%s=%s", HTRACE_SPAN_RECEIVER_KEY, "htraced", - HTRACED_ADDRESS_KEY, ht->htraced_http_addr)); + HTRACED_ADDRESS_KEY, ht->htraced_hrpc_addr)); EXPECT_INT_ZERO(rt->run(rt, conf_str)); start_ms = monotonic_now_ms(NULL); // http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/mini_htraced.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/test/mini_htraced.c b/htrace-c/src/test/mini_htraced.c index 31b5484..d97a01b 100644 --- a/htrace-c/src/test/mini_htraced.c +++ b/htrace-c/src/test/mini_htraced.c @@ -318,6 +318,9 @@ static void mini_htraced_write_conf_file(struct mini_htraced *ht, if (mini_htraced_write_conf_key(fp, "web.address", "127.0.0.1:0")) { goto ioerror; } + if (mini_htraced_write_conf_key(fp, "hrpc.address", "127.0.0.1:0")) { + goto ioerror; + } if (mini_htraced_write_conf_key(fp, "data.store.directories", "%s%c%s", ht->data_dir[0], PATH_LIST_SEP, ht->data_dir[1])) { goto ioerror; @@ -546,7 +549,7 @@ static void parse_startup_notification(struct mini_htraced *ht, char *err, size_t err_len) { struct json_tokener *tok = NULL; - struct json_object *root = NULL, *http_addr, *process_id; + struct json_object *root = NULL, *http_addr, *process_id, *hrpc_addr; int32_t pid; err[0] = '\0'; @@ -574,6 +577,18 @@ static void parse_startup_notification(struct mini_htraced *ht, snprintf(err, err_len, "OOM"); goto done; } + // Find the HRPC address, in the form of hostname:port, which the htraced + // is listening on. + if (!json_object_object_get_ex(root, "HrpcAddr", &hrpc_addr)) { + snprintf(err, err_len, "Failed to find HrpcAddr in the startup " + "notification."); + goto done; + } + ht->htraced_hrpc_addr = strdup(json_object_get_string(hrpc_addr)); + if (!ht->htraced_hrpc_addr) { + snprintf(err, err_len, "OOM"); + goto done; + } // Check that the process ID from the startup notification matches the // process ID from the fork. if (!json_object_object_get_ex(root, "ProcessId", &process_id)) { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/mini_htraced.h ---------------------------------------------------------------------- diff --git a/htrace-c/src/test/mini_htraced.h b/htrace-c/src/test/mini_htraced.h index a803f55..df999cc 100644 --- a/htrace-c/src/test/mini_htraced.h +++ b/htrace-c/src/test/mini_htraced.h @@ -106,6 +106,11 @@ struct mini_htraced { * The HTTP address of the htraced, in hostname:port format. */ char *htraced_http_addr; + + /** + * The HRPC address of the htraced, in hostname:port format. + */ + char *htraced_hrpc_addr; }; /** http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/rtest.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/test/rtest.c b/htrace-c/src/test/rtest.c index 0ec5272..b299376 100644 --- a/htrace-c/src/test/rtest.c +++ b/htrace-c/src/test/rtest.c @@ -52,10 +52,16 @@ static void get_receiver_test_prid(char *prid, size_t prid_len) static int rtest_data_init(const char *conf_str, struct rtest_data **out) { + char *econf_str = NULL; struct rtest_data *rdata = calloc(1, sizeof(*(rdata))); EXPECT_NONNULL(rdata); - rdata->cnf = htrace_conf_from_strs(conf_str, - HTRACE_PROCESS_ID"=%{tname}/%{pid};sampler=always"); + if (asprintf(&econf_str, HTRACE_PROCESS_ID"=%%{tname}/%%{pid};sampler=always;" + "%s", conf_str) < 0) { + fprintf(stderr, "asprintf(econf_str) failed: OOM\n"); + return EXIT_FAILURE; + } + rdata->cnf = htrace_conf_from_str(econf_str); + free(econf_str); EXPECT_NONNULL(rdata->cnf); rdata->tracer = htracer_create(RECEIVER_TEST_TNAME, rdata->cnf); EXPECT_NONNULL(rdata->tracer); http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/string-unit.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/test/string-unit.c b/htrace-c/src/test/string-unit.c index 56155ea..f5ec729 100644 --- a/htrace-c/src/test/string-unit.c +++ b/htrace-c/src/test/string-unit.c @@ -16,7 +16,10 @@ * limitations under the License. */ +#include "core/conf.h" +#include "core/htrace.h" #include "test/test.h" +#include "util/log.h" #include "util/string.h" #include <errno.h> @@ -63,10 +66,52 @@ static int test_validate_json_string(void) return EXIT_SUCCESS; } +static int test_parse_endpoint(struct htrace_log *lg, const char *eremote, + int eport, const char *endpoint) +{ + char *remote = NULL; + int port = 0; + + EXPECT_INT_EQ(1, parse_endpoint(lg, endpoint, 80, &remote, &port)); + EXPECT_NONNULL(remote); + EXPECT_STR_EQ(eremote, remote); + EXPECT_INT_EQ(eport, port); + free(remote); + return EXIT_SUCCESS; +} + +static int test_parse_endpoints(void) +{ + struct htrace_conf *cnf; + struct htrace_log *lg; + + cnf = htrace_conf_from_str(""); + EXPECT_NONNULL(cnf); + lg = htrace_log_alloc(cnf); + EXPECT_NONNULL(lg); + EXPECT_INT_ZERO(test_parse_endpoint(lg, "", 80, + "")); + EXPECT_INT_ZERO(test_parse_endpoint(lg, "127.0.0.1", 8080, + "127.0.0.1:8080")); + EXPECT_INT_ZERO(test_parse_endpoint(lg, "127.0.0.1", 80, + "127.0.0.1")); + EXPECT_INT_ZERO(test_parse_endpoint(lg, "foobar.example.com", 99, + "foobar.example.com:99")); + EXPECT_INT_ZERO(test_parse_endpoint(lg, "foobar", 80, + "foobar")); + EXPECT_INT_ZERO(test_parse_endpoint(lg, + "2001:db8:85a3:8d3:1319:8a2e:370:7348", 9075, + "[2001:db8:85a3:8d3:1319:8a2e:370:7348]:9075")); + htrace_log_free(lg); + htrace_conf_free(cnf); + return EXIT_SUCCESS; +} + int main(void) { EXPECT_INT_ZERO(test_fwdprintf()); EXPECT_INT_ZERO(test_validate_json_string()); + EXPECT_INT_ZERO(test_parse_endpoints()); return EXIT_SUCCESS; } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/util/string.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/util/string.c b/htrace-c/src/util/string.c index e9a4e91..b465092 100644 --- a/htrace-c/src/util/string.c +++ b/htrace-c/src/util/string.c @@ -21,6 +21,7 @@ #include <stdarg.h> #include <stdio.h> +#include <stdlib.h> #include <string.h> int fwdprintf(char **buf, int* rem, const char *fmt, ...) @@ -104,4 +105,58 @@ int validate_json_string(struct htrace_log *lg, const char *str) return 1; } +int parse_endpoint(struct htrace_log *lg, const char *endpoint, + int default_port, char **remote_out, int *port) +{ + const char *remotestr; + const char *portstr; + char *remote = NULL; + int remote_len; + + if (endpoint[0] == '[') { + remotestr = endpoint + 1; + remote_len = strcspn(remotestr, "]"); + if (remotestr[remote_len] != ']') { + htrace_log(lg, "parse_hostport: found open square bracket, but " + "not matching close square bracket.\n"); + return 0; + } + if (remotestr[remote_len + 1] == ':') { + portstr = remotestr + remote_len + 2; + } else { + portstr = NULL; + } + } else { + remotestr = endpoint; + remote_len = strcspn(remotestr, ":"); + if (remotestr[remote_len] == ':') { + portstr = remotestr + remote_len + 1; + } else { + portstr = NULL; + } + } + remote = malloc(remote_len + 1); + if (!remote) { + htrace_log(lg, "parse_hostport: unable to allocate %d-byte string.\n", + remote_len); + return 0; + } + memcpy(remote, remotestr, remote_len); + remote[remote_len] = '\0'; + if (!portstr) { + *port = default_port; + } else { + int p = atoi(portstr); + if ((p <= 0) || (p > 0xffff)) { + free(remote); + htrace_log(lg, "parse_hostport: parse port string '%s'\n", + portstr); + return 0; + } + *port = p; + } + *remote_out = remote; + return 1; +} + // vim: ts=4:sw=4:et http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/util/string.h ---------------------------------------------------------------------- diff --git a/htrace-c/src/util/string.h b/htrace-c/src/util/string.h index 77e764b..a4c80b0 100644 --- a/htrace-c/src/util/string.h +++ b/htrace-c/src/util/string.h @@ -59,6 +59,28 @@ int fwdprintf(char **buf, int* rem, const char *fmt, ...) */ int validate_json_string(struct htrace_log *lg, const char *str); +/** + * Parse an endpoint string. + * + * We support a few different formats: + * Hostname/port: + * my.hostname:9075 + * ipv4/port: + * 127.0.0.1:9075 + * ipv6/port: + * [2001:db8:85a3:8d3:1319:8a2e:370:7348]:9075 + * + * @param lg The htrace log object. + * @param endpoint The endpoint string. + * @param default_port The default port to use if no port is given. + * @param remote (out param) The remote name. Malloced. + * @param port (out param) The port. + * + * @return 0 on failure; 1 on success. + */ +int parse_endpoint(struct htrace_log *lg, const char *endpoint, + int default_port, char **remote, int *port); + #endif // vim: ts=4:sw=4:et http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/util/time.c ---------------------------------------------------------------------- diff --git a/htrace-c/src/util/time.c b/htrace-c/src/util/time.c index d4ad8ed..7d02772 100644 --- a/htrace-c/src/util/time.c +++ b/htrace-c/src/util/time.c @@ -44,6 +44,14 @@ void ms_to_timespec(uint64_t ms, struct timespec *ts) ts->tv_nsec = ms * 1000000LLU; } +void ms_to_timeval(uint64_t ms, struct timeval *tv) +{ + uint64_t sec = ms / 1000LLU; + tv->tv_sec = sec; + ms -= (sec * 1000LLU); + tv->tv_usec = ms * 1000LLU; +} + uint64_t now_ms(struct htrace_log *lg) { struct timespec ts; http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/util/time.h ---------------------------------------------------------------------- diff --git a/htrace-c/src/util/time.h b/htrace-c/src/util/time.h index 9b4f5d4..21f4841 100644 --- a/htrace-c/src/util/time.h +++ b/htrace-c/src/util/time.h @@ -31,6 +31,7 @@ struct htrace_log; struct timespec; +struct timeval; /** * Convert a timespec into a time in milliseconds. @@ -50,6 +51,14 @@ uint64_t timespec_to_ms(const struct timespec *ts); void ms_to_timespec(uint64_t ms, struct timespec *ts); /** + * Convert a time in milliseconds into a timeval. + * + * @param ms The time in milliseconds to convert. + * @param tv (out param) The timeval to fill. + */ +void ms_to_timeval(uint64_t ms, struct timeval *tv); + +/** * Get the current wall-clock time in milliseconds. * * @param log The log to use for error messsages. http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-htraced/src/go/src/org/apache/htrace/client/client.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/client/client.go b/htrace-htraced/src/go/src/org/apache/htrace/client/client.go index 44e2f69..6a62e81 100644 --- a/htrace-htraced/src/go/src/org/apache/htrace/client/client.go +++ b/htrace-htraced/src/go/src/org/apache/htrace/client/client.go @@ -37,13 +37,7 @@ import ( func NewClient(cnf *conf.Config) (*Client, error) { hcl := Client{} hcl.restAddr = cnf.Get(conf.HTRACE_WEB_ADDRESS) - if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" { - var err error - hcl.hcr, err = newHClient(cnf) - if err != nil { - return nil, err - } - } + hcl.hrpcAddr = cnf.Get(conf.HTRACE_HRPC_ADDRESS) return &hcl, nil } @@ -51,6 +45,9 @@ type Client struct { // REST address of the htraced server. restAddr string + // HRPC address of the htraced server. + hrpcAddr string + // The HRPC client, or null if it is not enabled. hcr *hClient } @@ -89,11 +86,15 @@ func (hcl *Client) FindSpan(sid common.SpanId) (*common.Span, error) { } func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error { - if hcl.hcr != nil { - return hcl.hcr.writeSpans(req) - } else { + if hcl.hrpcAddr == "" { return hcl.writeSpansHttp(req) } + hcr, err := newHClient(hcl.hrpcAddr) + if err != nil { + return err + } + defer hcr.Close() + return hcr.writeSpans(req) } func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go b/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go index 1730c02..5406d73 100644 --- a/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go +++ b/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go @@ -28,7 +28,6 @@ import ( "net" "net/rpc" "org/apache/htrace/common" - "org/apache/htrace/conf" ) type hClient struct { @@ -62,7 +61,7 @@ func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) erro Seq: req.Seq, Length: uint32(len(buf)), } - err = binary.Write(cdc.rwc, binary.BigEndian, &hdr) + err = binary.Write(cdc.rwc, binary.LittleEndian, &hdr) if err != nil { return errors.New(fmt.Sprintf("Error writing header bytes: %s", err.Error())) @@ -77,7 +76,7 @@ func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) erro func (cdc *HrpcClientCodec) ReadResponseHeader(resp *rpc.Response) error { hdr := common.HrpcResponseHeader{} - err := binary.Read(cdc.rwc, binary.BigEndian, &hdr) + err := binary.Read(cdc.rwc, binary.LittleEndian, &hdr) if err != nil { return errors.New(fmt.Sprintf("Error reading response header "+ "bytes: %s", err.Error())) @@ -129,13 +128,12 @@ func (cdc *HrpcClientCodec) Close() error { return cdc.rwc.Close() } -func newHClient(cnf *conf.Config) (*hClient, error) { +func newHClient(hrpcAddr string) (*hClient, error) { hcr := hClient{} - addr := cnf.Get(conf.HTRACE_HRPC_ADDRESS) - conn, err := net.Dial("tcp", addr) + conn, err := net.Dial("tcp", hrpcAddr) if err != nil { return nil, errors.New(fmt.Sprintf("Error contacting the HRPC server "+ - "at %s: %s", addr, err.Error())) + "at %s: %s", hrpcAddr, err.Error())) } hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{rwc: conn}) return &hcr, nil http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go index 9696cbc..eede69e 100644 --- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go +++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go @@ -47,27 +47,50 @@ type HrpcServer struct { // Codec which encodes HRPC data via JSON type HrpcServerCodec struct { - rwc io.ReadWriteCloser + lg *common.Logger + conn net.Conn length uint32 } +func asJson(val interface{}) string { + js, err := json.Marshal(val) + if err != nil { + return "encoding error: " + err.Error() + } + return string(js) +} + +func createErrAndLog(lg *common.Logger, val string) error { + lg.Warnf("%s\n", val) + return errors.New(val) +} + func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error { hdr := common.HrpcRequestHeader{} - err := binary.Read(cdc.rwc, binary.BigEndian, &hdr) + if cdc.lg.TraceEnabled() { + cdc.lg.Tracef("Reading HRPC request header from %s\n", cdc.conn.RemoteAddr()) + } + err := binary.Read(cdc.conn, binary.LittleEndian, &hdr) if err != nil { - return errors.New(fmt.Sprintf("Error reading header bytes: %s", err.Error())) + return createErrAndLog(cdc.lg, fmt.Sprintf("Error reading header bytes: %s", + err.Error())) + } + if cdc.lg.TraceEnabled() { + cdc.lg.Tracef("Read HRPC request header %s from %s\n", + asJson(&hdr), cdc.conn.RemoteAddr()) } if hdr.Magic != common.HRPC_MAGIC { - return errors.New(fmt.Sprintf("Invalid request header: expected "+ + return createErrAndLog(cdc.lg, fmt.Sprintf("Invalid request header: expected "+ "magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic)) } if hdr.Length > common.MAX_HRPC_BODY_LENGTH { - return errors.New(fmt.Sprintf("Length prefix was too long. Maximum "+ - "length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH, hdr.Length)) + return createErrAndLog(cdc.lg, fmt.Sprintf("Length prefix was too long. "+ + "Maximum length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH, + hdr.Length)) } req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId) if req.ServiceMethod == "" { - return errors.New(fmt.Sprintf("Unknown MethodID code 0x%04x", + return createErrAndLog(cdc.lg, fmt.Sprintf("Unknown MethodID code 0x%04x", hdr.MethodId)) } req.Seq = hdr.Seq @@ -76,11 +99,19 @@ func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error { } func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error { - dec := json.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length))) + if cdc.lg.TraceEnabled() { + cdc.lg.Tracef("Reading HRPC %d-byte request body from %s\n", + cdc.length, cdc.conn.RemoteAddr()) + } + dec := json.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length))) err := dec.Decode(body) if err != nil { - return errors.New(fmt.Sprintf("Failed to read request body: %s", - err.Error())) + return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to read request "+ + "body from %s: %s", cdc.conn.RemoteAddr(), err.Error())) + } + if cdc.lg.TraceEnabled() { + cdc.lg.Tracef("Read body from %s: %s\n", + cdc.conn.RemoteAddr(), asJson(&body)) } return nil } @@ -93,8 +124,8 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) e if msg != nil { buf, err = json.Marshal(msg) if err != nil { - return errors.New(fmt.Sprintf("Failed to marshal response message: %s", - err.Error())) + return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to marshal "+ + "response message: %s", err.Error())) } } hdr := common.HrpcResponseHeader{} @@ -102,41 +133,41 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) e hdr.Seq = resp.Seq hdr.ErrLength = uint32(len(resp.Error)) hdr.Length = uint32(len(buf)) - writer := bufio.NewWriterSize(cdc.rwc, 256) - err = binary.Write(writer, binary.BigEndian, &hdr) + writer := bufio.NewWriterSize(cdc.conn, 256) + err = binary.Write(writer, binary.LittleEndian, &hdr) if err != nil { - return errors.New(fmt.Sprintf("Failed to write response header: %s", - err.Error())) + return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write response "+ + "header: %s", err.Error())) } if hdr.ErrLength > 0 { _, err = io.WriteString(writer, resp.Error) if err != nil { - return errors.New(fmt.Sprintf("Failed to write error string: %s", - err.Error())) + return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write error "+ + "string: %s", err.Error())) } } if hdr.Length > 0 { var length int length, err = writer.Write(buf) if err != nil { - return errors.New(fmt.Sprintf("Failed to write response "+ + return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write response "+ "message: %s", err.Error())) } if uint32(length) != hdr.Length { - return errors.New(fmt.Sprintf("Failed to write all of response "+ - "message: %s", err.Error())) + return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write all of "+ + "response message: %s", err.Error())) } } err = writer.Flush() if err != nil { - return errors.New(fmt.Sprintf("Failed to write the response bytes: "+ - "%s", err.Error())) + return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write the response "+ + "bytes: %s", err.Error())) } return nil } func (cdc *HrpcServerCodec) Close() error { - return cdc.rwc.Close() + return cdc.conn.Close() } func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq, @@ -148,7 +179,9 @@ func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq, if span.ProcessId == "" { span.ProcessId = req.DefaultPid } - hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson()) + if hand.lg.TraceEnabled() { + hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson()) + } hand.store.WriteSpan(span) } return nil @@ -182,8 +215,12 @@ func (hsv *HrpcServer) run() { lg.Errorf("HRPC Accept error: %s\n", err.Error()) continue } + if lg.TraceEnabled() { + lg.Tracef("Accepted HRPC connection from %s\n", conn.RemoteAddr()) + } go hsv.ServeCodec(&HrpcServerCodec{ - rwc: conn, + lg: lg, + conn: conn, }) } }
