This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch cpp_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit f214efe172ab4aaad6e938148a6c43db688227b9 Author: Li Zhanhui <[email protected]> AuthorDate: Thu Jul 14 16:14:57 2022 +0800 Remove deprecated features --- cpp/bazel/curl.bzl | 201 --------------------- cpp/bazel/rocketmq_deps.bzl | 12 -- cpp/src/main/cpp/base/BUILD.bazel | 1 - cpp/src/main/cpp/base/CredentialsProvider.cpp | 98 +--------- cpp/src/main/cpp/base/HostInfo.cpp | 77 -------- cpp/src/main/cpp/base/HttpClientImpl.cpp | 90 --------- cpp/src/main/cpp/base/TopAddressing.cpp | 76 -------- cpp/src/main/cpp/base/include/HostInfo.h | 47 ----- cpp/src/main/cpp/base/include/HttpClient.h | 53 ------ cpp/src/main/cpp/base/include/HttpClientImpl.h | 46 ----- .../cpp/base/include/StsCredentialsProviderImpl.h | 61 ------- cpp/src/main/cpp/base/include/TopAddressing.h | 52 ------ cpp/src/main/cpp/client/include/ClientManager.h | 1 - .../main/cpp/client/include/ClientManagerImpl.h | 52 ++++-- cpp/src/main/cpp/rocketmq/ClientImpl.cpp | 1 - .../cpp/rocketmq/DynamicNameServerResolver.cpp | 129 ------------- cpp/src/main/cpp/rocketmq/Producer.cpp | 1 - cpp/src/main/cpp/rocketmq/PushConsumer.cpp | 1 - .../rocketmq/include/DynamicNameServerResolver.h | 73 -------- cpp/third_party/cpp_httplib.BUILD | 17 -- cpp/third_party/curl.BUILD | 35 ---- 21 files changed, 39 insertions(+), 1085 deletions(-) diff --git a/cpp/bazel/curl.bzl b/cpp/bazel/curl.bzl deleted file mode 100644 index b83d054..0000000 --- a/cpp/bazel/curl.bzl +++ /dev/null @@ -1,201 +0,0 @@ -# Copyright 2018, OpenCensus Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Compiler options for building libcurl. - -BASE_CURL_COPTS = [ - # Disable everything else except HTTP protocol. - "-DHTTP_ONLY=1", - "-DENABLE_IPV6=1", - "-DGETHOSTNAME_TYPE_ARG2=size_t", - "-DGETSERVBYPORT_R_ARGS=6", - "-DGETSERVBYPORT_R_BUFSIZE=4096", - "-DHAVE_ALARM=1", - "-DHAVE_ALLOCA_H=1", - "-DHAVE_ARPA_INET_H=1", - "-DHAVE_ARPA_TFTP_H=1", - "-DHAVE_ASSERT_H=1", - "-DHAVE_BASENAME=1", - "-DHAVE_BOOL_T=1", - "-DHAVE_CLOCK_GETTIME_MONOTONIC=1", - "-DHAVE_CONNECT=1", - "-DHAVE_DLFCN_H=1", - "-DHAVE_ENGINE_LOAD_BUILTIN_ENGINES=1", - "-DHAVE_ERRNO_H=1", - "-DHAVE_FCNTL=1", - "-DHAVE_FCNTL_H=1", - "-DHAVE_FCNTL_O_NONBLOCK=1", - "-DHAVE_FDOPEN=1", - "-DHAVE_FREEADDRINFO=1", - "-DHAVE_FREEIFADDRS=1", - "-DHAVE_FSETXATTR=1", - "-DHAVE_FSETXATTR_5=1", - "-DHAVE_FTRUNCATE=1", - "-DHAVE_GAI_STRERROR=1", - "-DHAVE_GETADDRINFO=1", - "-DHAVE_GETADDRINFO_THREADSAFE=1", - "-DHAVE_GETEUID=1", - "-DHAVE_GETHOSTBYADDR=1", - "-DHAVE_GETHOSTBYADDR_R=1", - "-DHAVE_GETHOSTBYADDR_R_8=1", - "-DHAVE_GETHOSTBYNAME=1", - "-DHAVE_GETHOSTBYNAME_R=1", - "-DHAVE_GETHOSTBYNAME_R_6=1", - "-DHAVE_GETHOSTNAME=1", - "-DHAVE_GETIFADDRS=1", - "-DHAVE_GETNAMEINFO=1", - "-DHAVE_GETPPID=1", - "-DHAVE_GETPWUID=1", - "-DHAVE_GETPWUID_R=1", - "-DHAVE_GETRLIMIT=1", - "-DHAVE_GETSERVBYPORT_R=1", - "-DHAVE_GETTIMEOFDAY=1", - "-DHAVE_GMTIME_R=1", - "-DHAVE_IFADDRS_H=1", - "-DHAVE_IF_NAMETOINDEX=1", - "-DHAVE_INET_NTOP=1", - "-DHAVE_INET_PTON=1", - "-DHAVE_INTTYPES_H=1", - "-DHAVE_IOCTL=1", - "-DHAVE_IOCTL_FIONBIO=1", - "-DHAVE_IOCTL_SIOCGIFADDR=1", - "-DHAVE_LIBGEN_H=1", - "-DHAVE_LL=1", - "-DHAVE_LOCALE_H=1", - "-DHAVE_LOCALTIME_R=1", - "-DHAVE_LONGLONG=1", - "-DHAVE_MALLOC_H=1", - "-DHAVE_MEMORY_H=1", - "-DHAVE_NETDB_H=1", - "-DHAVE_NETINET_IN_H=1", - "-DHAVE_NETINET_TCP_H=1", - "-DHAVE_NET_IF_H=1", - "-DHAVE_PIPE=1", - "-DHAVE_POLL=1", - "-DHAVE_POLL_FINE=1", - "-DHAVE_POLL_H=1", - "-DHAVE_POSIX_STRERROR_R=1", - "-DHAVE_PTHREAD_H=1", - "-DHAVE_PWD_H=1", - "-DHAVE_RECV=1", - "-DHAVE_SELECT=1", - "-DHAVE_SEND=1", - "-DHAVE_SETJMP_H=1", - "-DHAVE_SETLOCALE=1", - "-DHAVE_SETRLIMIT=1", - "-DHAVE_SETSOCKOPT=1", - "-DHAVE_SGTTY_H=1", - "-DHAVE_SIGACTION=1", - "-DHAVE_SIGINTERRUPT=1", - "-DHAVE_SIGNAL=1", - "-DHAVE_SIGNAL_H=1", - "-DHAVE_SIGSETJMP=1", - "-DHAVE_SIG_ATOMIC_T=1", - "-DHAVE_SOCKADDR_IN6_SIN6_SCOPE_ID=1", - "-DHAVE_SOCKET=1", - "-DHAVE_SOCKETPAIR=1", - "-DHAVE_STDBOOL_H=1", - "-DHAVE_STDINT_H=1", - "-DHAVE_STDIO_H=1", - "-DHAVE_STDLIB_H=1", - "-DHAVE_STRCASECMP=1", - "-DHAVE_STRDUP=1", - "-DHAVE_STRERROR_R=1", - "-DHAVE_STRINGS_H=1", - "-DHAVE_STRING_H=1", - "-DHAVE_STRNCASECMP=1", - "-DHAVE_STRSTR=1", - "-DHAVE_STRTOK_R=1", - "-DHAVE_STRTOLL=1", - "-DHAVE_STRUCT_SOCKADDR_STORAGE=1", - "-DHAVE_STRUCT_TIMEVAL=1", - "-DHAVE_SYS_IOCTL_H=1", - "-DHAVE_SYS_PARAM_H=1", - "-DHAVE_SYS_POLL_H=1", - "-DHAVE_SYS_RESOURCE_H=1", - "-DHAVE_SYS_SELECT_H=1", - "-DHAVE_SYS_SOCKET_H=1", - "-DHAVE_SYS_STAT_H=1", - "-DHAVE_SYS_TIME_H=1", - "-DHAVE_SYS_TYPES_H=1", - "-DHAVE_SYS_UIO_H=1", - "-DHAVE_SYS_UN_H=1", - "-DHAVE_SYS_WAIT_H=1", - "-DHAVE_SYS_XATTR_H=1", - "-DHAVE_TERMIOS_H=1", - "-DHAVE_TERMIO_H=1", - "-DHAVE_TIME_H=1", - "-DHAVE_UNISTD_H=1", - "-DHAVE_UTIME=1", - "-DHAVE_UTIMES=1", - "-DHAVE_UTIME_H=1", - "-DHAVE_VARIADIC_MACROS_C99=1", - "-DHAVE_VARIADIC_MACROS_GCC=1", - "-DHAVE_WRITABLE_ARGV=1", - "-DHAVE_WRITEV=1", - "-DRECV_TYPE_ARG1=int", - "-DRECV_TYPE_ARG2=void*", - "-DRECV_TYPE_ARG3=size_t", - "-DRECV_TYPE_ARG4=int", - "-DRECV_TYPE_RETV=ssize_t", - "-DRETSIGTYPE=void", - "-DSELECT_QUAL_ARG5=", - "-DSELECT_TYPE_ARG1=int", - "-DSELECT_TYPE_ARG234=fd_set*", - "-DSELECT_TYPE_RETV=int", - "-DSEND_QUAL_ARG2=const", - "-DSEND_TYPE_ARG1=int", - "-DSEND_TYPE_ARG2=void*", - "-DSEND_TYPE_ARG3=size_t", - "-DSEND_TYPE_ARG4=int", - "-DSEND_TYPE_RETV=ssize_t", - "-DSIZEOF_CURL_OFF_T=8", - "-DSIZEOF_INT=4", - "-DSIZEOF_LONG=8", - "-DSIZEOF_OFF_T=8", - "-DSIZEOF_SHORT=2", - "-DSIZEOF_SIZE_T=8", - "-DSIZEOF_TIME_T=8", - "-DSTDC_HEADERS=1", - "-DSTRERROR_R_TYPE_ARG3=size_t", - "-DTIME_WITH_SYS_TIME=1", - "-DUSE_THREADS_POSIX=1", - "-DUSE_UNIX_SOCKETS=1", - - # Extra defines needed by curl - "-DBUILDING_LIBCURL", - "-DCURL_HIDDEN_SYMBOLS", -] - -LINUX_CURL_COPTS = [ - "-DHAVE_LINUX_TCP_H=1", - "-DHAVE_MSG_NOSIGNAL=1", -] - -CURL_COPTS = select({ - "//:windows": [ - # Disable everything else except HTTP protocol. - "/DHTTP_ONLY=1", - "/DCURL_STATICLIB", - "/DWIN32", - "/DBUILDING_LIBCURL", - "/DUSE_WIN32_IDN", - "/DWANT_IDN_PROTOTYPES", - "/DUSE_IPV6", - "/DUSE_WINDOWS_SSPI", - "/DUSE_SCHANNEL", - ], - "//:osx": BASE_CURL_COPTS, - "//conditions:default": BASE_CURL_COPTS + LINUX_CURL_COPTS, -}) diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl index 983a476..fd201a2 100644 --- a/cpp/bazel/rocketmq_deps.bzl +++ b/cpp/bazel/rocketmq_deps.bzl @@ -134,18 +134,6 @@ def rocketmq_deps(): ], ) - maybe( - http_archive, - name = "com_github_yhirose_cpp_httplib", - sha256 = "0ff62e28eb0f6e563178d44b77c94dddb8702141d83dd34b83cb046399c2b1d5", - build_file = "@org_apache_rocketmq//third_party:cpp_httplib.BUILD", - strip_prefix = "cpp-httplib-0.9.4", - urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/cpp-httplib/cpp-httplib-0.9.4.tar.gz", - "https://github.com/yhirose/cpp-httplib/archive/refs/tags/v0.9.4.tar.gz", - ], - ) - maybe( http_archive, name = "com_google_googleapis", diff --git a/cpp/src/main/cpp/base/BUILD.bazel b/cpp/src/main/cpp/base/BUILD.bazel index 959bba5..3768b14 100644 --- a/cpp/src/main/cpp/base/BUILD.bazel +++ b/cpp/src/main/cpp/base/BUILD.bazel @@ -40,7 +40,6 @@ cc_library( "@boringssl//:crypto", "@boringssl//:ssl", "//external:madler_zlib", - "@com_github_yhirose_cpp_httplib//:cpp_httplib", "@asio//:asio", ], ) \ No newline at end of file diff --git a/cpp/src/main/cpp/base/CredentialsProvider.cpp b/cpp/src/main/cpp/base/CredentialsProvider.cpp index 548e9d8..5a35bf1 100644 --- a/cpp/src/main/cpp/base/CredentialsProvider.cpp +++ b/cpp/src/main/cpp/base/CredentialsProvider.cpp @@ -14,22 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "rocketmq/CredentialsProvider.h" + #include <cstdlib> #include <fstream> #include <iostream> #include <string> +#include "MixAll.h" #include "absl/memory/memory.h" #include "absl/strings/match.h" #include "fmt/format.h" #include "ghc/filesystem.hpp" #include "google/protobuf/struct.pb.h" #include "google/protobuf/util/json_util.h" -#include "spdlog/spdlog.h" - -#include "MixAll.h" -#include "StsCredentialsProviderImpl.h" #include "rocketmq/Logger.h" +#include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN @@ -117,94 +117,4 @@ Credentials ConfigFileCredentialsProvider::getCredentials() { return Credentials(access_key_, access_secret_); } -StsCredentialsProvider::StsCredentialsProvider(std::string ram_role_name) - : impl_(absl::make_unique<StsCredentialsProviderImpl>(std::move(ram_role_name))) { -} - -Credentials StsCredentialsProvider::getCredentials() { - return impl_->getCredentials(); -} - -StsCredentialsProviderImpl::StsCredentialsProviderImpl(std::string ram_role_name) - : ram_role_name_(std::move(ram_role_name)) { -} - -StsCredentialsProviderImpl::~StsCredentialsProviderImpl() { - http_client_->shutdown(); -} - -Credentials StsCredentialsProviderImpl::getCredentials() { - if (std::chrono::system_clock::now() >= expiration_) { - refresh(); - } - - { - absl::MutexLock lk(&mtx_); - return Credentials(access_key_, access_secret_, session_token_, expiration_); - } -} - -void StsCredentialsProviderImpl::refresh() { - std::string path = fmt::format("{}{}", RAM_ROLE_URL_PREFIX, ram_role_name_); - absl::Mutex sync_mtx; - absl::CondVar sync_cv; - bool completed = false; - auto callback = [&, this](int code, const std::multimap<std::string, std::string>& headers, const std::string& body) { - SPDLOG_DEBUG("Received STS response. Code: {}", code); - if (static_cast<int>(HttpStatus::OK) == code) { - google::protobuf::Struct doc; - google::protobuf::util::Status status = google::protobuf::util::JsonStringToMessage(body, &doc); - if (status.ok()) { - const auto& fields = doc.fields(); - assert(fields.contains(FIELD_ACCESS_KEY)); - std::string access_key = fields.at(FIELD_ACCESS_KEY).string_value(); - assert(fields.contains(FIELD_ACCESS_SECRET)); - std::string access_secret = fields.at(FIELD_ACCESS_SECRET).string_value(); - assert(fields.contains(FIELD_SESSION_TOKEN)); - std::string session_token = fields.at(FIELD_SESSION_TOKEN).string_value(); - assert(fields.contains(FIELD_EXPIRATION)); - std::string expiration_string = fields.at(FIELD_EXPIRATION).string_value(); - absl::Time expiration_instant; - std::string parse_error; - if (absl::ParseTime(EXPIRATION_DATE_TIME_FORMAT, expiration_string, absl::UTCTimeZone(), &expiration_instant, - &parse_error)) { - absl::MutexLock lk(&mtx_); - access_key_ = std::move(access_key); - access_secret_ = std::move(access_secret); - session_token_ = std::move(session_token); - expiration_ = absl::ToChronoTime(expiration_instant); - } else { - SPDLOG_WARN("Failed to parse expiration time. Message: {}", parse_error); - } - - } else { - SPDLOG_WARN("Failed to parse STS response. Message: {}", status.message().as_string()); - } - } else { - SPDLOG_WARN("STS response code is not OK. Code: {}", code); - } - - { - absl::MutexLock lk(&sync_mtx); - completed = true; - sync_cv.Signal(); - } - }; - - http_client_->get(HttpProtocol::HTTP, RAM_ROLE_HOST, 80, path, callback); - - while (!completed) { - absl::MutexLock lk(&sync_mtx); - sync_cv.Wait(&sync_mtx); - } -} - -const char* StsCredentialsProviderImpl::RAM_ROLE_HOST = "100.100.100.200"; -const char* StsCredentialsProviderImpl::RAM_ROLE_URL_PREFIX = "/latest/meta-data/Ram/security-credentials/"; -const char* StsCredentialsProviderImpl::FIELD_ACCESS_KEY = "AccessKeyId"; -const char* StsCredentialsProviderImpl::FIELD_ACCESS_SECRET = "AccessKeySecret"; -const char* StsCredentialsProviderImpl::FIELD_SESSION_TOKEN = "SecurityToken"; -const char* StsCredentialsProviderImpl::FIELD_EXPIRATION = "Expiration"; -const char* StsCredentialsProviderImpl::EXPIRATION_DATE_TIME_FORMAT = "%Y-%m-%d%ET%H:%H:%S%Ez"; - ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/base/HostInfo.cpp b/cpp/src/main/cpp/base/HostInfo.cpp deleted file mode 100644 index 8b57247..0000000 --- a/cpp/src/main/cpp/base/HostInfo.cpp +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "HostInfo.h" -#include "absl/strings/match.h" -#include "rocketmq/RocketMQ.h" -#include <cstdlib> -#include <cstring> - -ROCKETMQ_NAMESPACE_BEGIN - -const char* HostInfo::ENV_LABEL_SITE = "SIGMA_APP_SITE"; -const char* HostInfo::ENV_LABEL_UNIT = "SIGMA_APP_UNIT"; -const char* HostInfo::ENV_LABEL_APP = "SIGMA_APP_NAME"; -const char* HostInfo::ENV_LABEL_STAGE = "SIGMA_APP_STAGE"; - -HostInfo::HostInfo() { - getEnv(ENV_LABEL_SITE, site_); - getEnv(ENV_LABEL_UNIT, unit_); - getEnv(ENV_LABEL_APP, app_); - getEnv(ENV_LABEL_STAGE, stage_); -} - -void HostInfo::getEnv(const char* env, std::string& holder) { - if (!strlen(env)) { - return; - } - - char* value = getenv(env); - if (nullptr != value) { - holder.clear(); - holder.append(value); - } -} - -bool HostInfo::hasHostInfo() const { - return !unit_.empty() && !stage_.empty(); -} - -std::string HostInfo::queryString() const { - if (!hasHostInfo()) { - return std::string(); - } - - std::string query_string("labels="); - appendLabel(query_string, "site", site_); - appendLabel(query_string, "unit", unit_); - appendLabel(query_string, "app", app_); - appendLabel(query_string, "stage", stage_); - return query_string; -} - -void HostInfo::appendLabel(std::string& query_string, const char* key, const std::string& value) { - if (value.empty()) { - return; - } - - if (absl::EndsWith(query_string, "=")) { - query_string.append(key).append(":").append(value); - } else { - query_string.append(",").append(key).append(":").append(value); - } -} -ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/base/HttpClientImpl.cpp b/cpp/src/main/cpp/base/HttpClientImpl.cpp deleted file mode 100644 index cb25ae9..0000000 --- a/cpp/src/main/cpp/base/HttpClientImpl.cpp +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "HttpClientImpl.h" - -#include <memory> -#include <string> - -#include "fmt/format.h" -#include "spdlog/spdlog.h" - -#include "rocketmq/RocketMQ.h" - -ROCKETMQ_NAMESPACE_BEGIN - -void HttpClientImpl::start() { -} - -void HttpClientImpl::shutdown() { -} - -/** - * @brief We current implement this function in sync mode since async http request in CURL is sort of unnecessarily - * complex. - * - * @param protocol - * @param host - * @param port - * @param path - * @param cb - */ -void HttpClientImpl::get( - HttpProtocol protocol, const std::string& host, std::uint16_t port, const std::string& path, - const std::function<void(int, const std::multimap<std::string, std::string>&, const std::string&)>& cb) { - - std::string key; - switch (protocol) { - case HttpProtocol::HTTP: - key = fmt::format("http://{}:{}", host, port); - break; - case HttpProtocol::HTTPS: - key = fmt::format("https://{}:{}", host, port); - break; - } - - std::shared_ptr<httplib::Client> client; - { - absl::MutexLock lk(&clients_mtx_); - if (clients_.contains(key)) { - client = clients_[key]; - } - - if (!client || !client->is_valid()) { - client = std::make_shared<httplib::Client>(key); - clients_.insert_or_assign(key, client); - } - } - - if (!client || !client->is_valid()) { - int code = 400; - std::multimap<std::string, std::string> headers; - std::string response; - cb(code, headers, response); - return; - } - - auto res = client->Get(path.c_str()); - - std::multimap<std::string, std::string> headers; - for (auto& header : headers) { - headers.insert({header.first, header.second}); - } - - cb(res->status, headers, res->body); -} - -ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/base/TopAddressing.cpp b/cpp/src/main/cpp/base/TopAddressing.cpp deleted file mode 100644 index dd45e67..0000000 --- a/cpp/src/main/cpp/base/TopAddressing.cpp +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "TopAddressing.h" - -#include "HttpClientImpl.h" -#include "absl/memory/memory.h" -#include "absl/strings/match.h" -#include "absl/strings/str_split.h" -#include "spdlog/spdlog.h" -#include <utility> - -ROCKETMQ_NAMESPACE_BEGIN - -TopAddressing::TopAddressing() : TopAddressing("jmenv.tbsite.net", 8080, "/rocketmq/nsaddr") { -} - -TopAddressing::TopAddressing(std::string host, int port, std::string path) - : host_(std::move(host)), port_(port), path_(std::move(path)), http_client_(absl::make_unique<HttpClientImpl>()) { - http_client_->start(); -} - -TopAddressing::~TopAddressing() { - http_client_->shutdown(); -} - -void TopAddressing::fetchNameServerAddresses(const std::function<void(bool, const std::vector<std::string>&)>& cb) { - SPDLOG_DEBUG("Prepare to send HTTP request, timeout=3s"); - std::string base(fmt::format("http://{}:{}", host_, port_)); - // Append host info if necessary. - std::string query_string(path_); - - if (absl::StrContains(query_string, "?")) { - query_string.append("&"); - } else { - query_string.append("?"); - } - - if (host_info_.hasHostInfo()) { - query_string.append(host_info_.queryString()); - } else { - query_string.append("nofix=1"); - } - - auto callback = [cb](int code, const std::multimap<std::string, std::string>& metadata, const std::string& body) { - SPDLOG_DEBUG("Receive HTTP response. Code: {}, body: {}", code, body); - if (static_cast<int>(HttpStatus::OK) == code) { - cb(true, absl::StrSplit(body, ';')); - } else { - std::vector<std::string> name_server_list; - cb(false, name_server_list); - } - }; - - http_client_->get(HttpProtocol::HTTP, host_, port_, query_string, callback); -} - -void TopAddressing::injectHttpClient(std::unique_ptr<HttpClient> http_client) { - http_client_->shutdown(); - http_client_.swap(http_client); -} - -ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/base/include/HostInfo.h b/cpp/src/main/cpp/base/include/HostInfo.h deleted file mode 100644 index 1b2a543..0000000 --- a/cpp/src/main/cpp/base/include/HostInfo.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include <string> - -#include "rocketmq/RocketMQ.h" - -ROCKETMQ_NAMESPACE_BEGIN -struct HostInfo { - std::string site_; - std::string unit_; - std::string app_; - std::string stage_; - - explicit HostInfo(); - - bool hasHostInfo() const; - - std::string queryString() const; - - static const char* ENV_LABEL_SITE; - static const char* ENV_LABEL_UNIT; - static const char* ENV_LABEL_APP; - static const char* ENV_LABEL_STAGE; - -private: - static void getEnv(const char* env, std::string& holder); - - static void appendLabel(std::string& query_string, const char* key, const std::string& value); -}; - -ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/base/include/HttpClient.h b/cpp/src/main/cpp/base/include/HttpClient.h deleted file mode 100644 index 77f6955..0000000 --- a/cpp/src/main/cpp/base/include/HttpClient.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include <cstdint> -#include <functional> -#include <map> -#include <string> - -#include "rocketmq/RocketMQ.h" - -ROCKETMQ_NAMESPACE_BEGIN - -enum class HttpProtocol : int8_t -{ - HTTP = 1, - HTTPS = 2, -}; - -enum class HttpStatus : int -{ - OK = 200, - INTERNAL = 500, -}; - -class HttpClient { -public: - virtual ~HttpClient() = default; - - virtual void start() = 0; - - virtual void shutdown() = 0; - - virtual void - get(HttpProtocol protocol, const std::string& host, std::uint16_t port, const std::string& path, - const std::function<void(int, const std::multimap<std::string, std::string>&, const std::string&)>& cb) = 0; -}; - -ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/base/include/HttpClientImpl.h b/cpp/src/main/cpp/base/include/HttpClientImpl.h deleted file mode 100644 index e54bb43..0000000 --- a/cpp/src/main/cpp/base/include/HttpClientImpl.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include "absl/base/thread_annotations.h" -#include "absl/container/flat_hash_map.h" -#include "absl/synchronization/mutex.h" -#include "httplib.h" - -#include "HttpClient.h" -#include "rocketmq/RocketMQ.h" - -ROCKETMQ_NAMESPACE_BEGIN - -class HttpClientImpl : public HttpClient { -public: - ~HttpClientImpl() override = default; - - void start() override; - - void shutdown() override; - - void - get(HttpProtocol protocol, const std::string& host, std::uint16_t port, const std::string& path, - const std::function<void(int, const std::multimap<std::string, std::string>&, const std::string&)>& cb) override; - -private: - absl::flat_hash_map<std::string, std::shared_ptr<httplib::Client>> clients_ GUARDED_BY(clients_mtx_); - absl::Mutex clients_mtx_; -}; - -ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/base/include/StsCredentialsProviderImpl.h b/cpp/src/main/cpp/base/include/StsCredentialsProviderImpl.h deleted file mode 100644 index 0789e4a..0000000 --- a/cpp/src/main/cpp/base/include/StsCredentialsProviderImpl.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include "absl/base/thread_annotations.h" -#include "absl/synchronization/mutex.h" - -#include "HttpClient.h" -#include "rocketmq/CredentialsProvider.h" - -ROCKETMQ_NAMESPACE_BEGIN - -class StsCredentialsProviderImpl : public CredentialsProvider { -public: - explicit StsCredentialsProviderImpl(std::string ram_role_name); - - ~StsCredentialsProviderImpl() override; - - Credentials getCredentials() override; - - void withHttpClient(std::unique_ptr<HttpClient> http_client) { - http_client_ = std::move(http_client); - } - -private: - static const char* RAM_ROLE_HOST; - static const char* RAM_ROLE_URL_PREFIX; - static const char* FIELD_ACCESS_KEY; - static const char* FIELD_ACCESS_SECRET; - static const char* FIELD_SESSION_TOKEN; - static const char* FIELD_EXPIRATION; - static const char* EXPIRATION_DATE_TIME_FORMAT; - - std::string ram_role_name_; - - std::string access_key_ GUARDED_BY(mtx_); - std::string access_secret_ GUARDED_BY(mtx_); - std::string session_token_ GUARDED_BY(mtx_); - std::chrono::system_clock::time_point expiration_; - - absl::Mutex mtx_; - void refresh() LOCKS_EXCLUDED(mtx_); - - std::unique_ptr<HttpClient> http_client_; -}; - -ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/base/include/TopAddressing.h b/cpp/src/main/cpp/base/include/TopAddressing.h deleted file mode 100644 index 0260259..0000000 --- a/cpp/src/main/cpp/base/include/TopAddressing.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include <cassert> -#include <cstdlib> -#include <functional> -#include <memory> -#include <string> -#include <vector> - -#include "HostInfo.h" -#include "HttpClient.h" - -ROCKETMQ_NAMESPACE_BEGIN - -class TopAddressing { -public: - TopAddressing(); - - TopAddressing(std::string host, int port, std::string path); - - virtual ~TopAddressing(); - - void fetchNameServerAddresses(const std::function<void(bool, const std::vector<std::string>&)>& cb); - - void injectHttpClient(std::unique_ptr<HttpClient> http_client); - -private: - std::string host_; - int port_{8080}; - std::string path_; - HostInfo host_info_; - - std::unique_ptr<HttpClient> http_client_; -}; - -ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/client/include/ClientManager.h b/cpp/src/main/cpp/client/include/ClientManager.h index 2ec5b2e..56325fa 100644 --- a/cpp/src/main/cpp/client/include/ClientManager.h +++ b/cpp/src/main/cpp/client/include/ClientManager.h @@ -28,7 +28,6 @@ #include "RpcClient.h" #include "Scheduler.h" #include "TelemetryBidiReactor.h" -#include "TopAddressing.h" #include "TopicRouteData.h" #include "rocketmq/SendCallback.h" #include "rocketmq/State.h" diff --git a/cpp/src/main/cpp/client/include/ClientManagerImpl.h b/cpp/src/main/cpp/client/include/ClientManagerImpl.h index 63b0713..9754c6f 100644 --- a/cpp/src/main/cpp/client/include/ClientManagerImpl.h +++ b/cpp/src/main/cpp/client/include/ClientManagerImpl.h @@ -40,7 +40,6 @@ #include "SendMessageContext.h" #include "TelemetryBidiReactor.h" #include "ThreadPoolImpl.h" -#include "TopAddressing.h" #include "TopicRouteChangeCallback.h" #include "TopicRouteData.h" #include "absl/base/thread_annotations.h" @@ -81,7 +80,9 @@ public: * @param timeout RPC timeout. * @param cb Callback to execute once the request/response completes. */ - void resolveRoute(const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request, + void resolveRoute(const std::string& target_host, + const Metadata& metadata, + const QueryRouteRequest& request, std::chrono::milliseconds timeout, const std::function<void(const std::error_code&, const TopicRouteDataPtr&)>& cb) override LOCKS_EXCLUDED(rpc_clients_mtx_); @@ -92,7 +93,9 @@ public: */ std::vector<std::string> cleanOfflineRpcClients() LOCKS_EXCLUDED(clients_mtx_, rpc_clients_mtx_); - bool send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request, + bool send(const std::string& target_host, + const Metadata& metadata, + SendMessageRequest& request, SendCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_); /** @@ -109,7 +112,8 @@ public: RpcClientSharedPtr getRpcClient(const std::string& target_host, bool need_heartbeat = true) override LOCKS_EXCLUDED(rpc_clients_mtx_); - static SendReceipt processSendResponse(const rmq::MessageQueue& message_queue, const SendMessageResponse& response, + static SendReceipt processSendResponse(const rmq::MessageQueue& message_queue, + const SendMessageResponse& response, std::error_code& ec); // only for test @@ -120,13 +124,17 @@ public: void addClientObserver(std::weak_ptr<Client> client) override; - void queryAssignment(const std::string& target, const Metadata& metadata, const QueryAssignmentRequest& request, + void queryAssignment(const std::string& target, + const Metadata& metadata, + const QueryAssignmentRequest& request, std::chrono::milliseconds timeout, const std::function<void(const std::error_code&, const QueryAssignmentResponse&)>& cb) override; - void receiveMessage(const std::string& target, const Metadata& metadata, const ReceiveMessageRequest& request, - std::chrono::milliseconds timeout, ReceiveMessageCallback cb) override - LOCKS_EXCLUDED(rpc_clients_mtx_); + void receiveMessage(const std::string& target, + const Metadata& metadata, + const ReceiveMessageRequest& request, + std::chrono::milliseconds timeout, + ReceiveMessageCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_); /** * Translate protobuf message struct to domain model. @@ -143,11 +151,16 @@ public: * @param target_host Target broker host address. * @param request Ack message request. */ - void ack(const std::string& target_host, const Metadata& metadata, const AckMessageRequest& request, - std::chrono::milliseconds timeout, const std::function<void(const std::error_code&)>& cb) override; - - void changeInvisibleDuration(const std::string& target_host, const Metadata& metadata, - const ChangeInvisibleDurationRequest&, std::chrono::milliseconds timeout, + void ack(const std::string& target_host, + const Metadata& metadata, + const AckMessageRequest& request, + std::chrono::milliseconds timeout, + const std::function<void(const std::error_code&)>& cb) override; + + void changeInvisibleDuration(const std::string& target_host, + const Metadata& metadata, + const ChangeInvisibleDurationRequest&, + std::chrono::milliseconds timeout, const std::function<void(const std::error_code&)>&) override; void forwardMessageToDeadLetterQueue(const std::string& target_host, @@ -170,11 +183,14 @@ public: * @param timeout * @param cb */ - void endTransaction(const std::string& target_host, const Metadata& metadata, const EndTransactionRequest& request, + void endTransaction(const std::string& target_host, + const Metadata& metadata, + const EndTransactionRequest& request, std::chrono::milliseconds timeout, const std::function<void(const std::error_code&, const EndTransactionResponse&)>& cb) override; - std::error_code notifyClientTermination(const std::string& target_host, const Metadata& metadata, + std::error_code notifyClientTermination(const std::string& target_host, + const Metadata& metadata, const NotifyClientTerminationRequest& request, std::chrono::milliseconds timeout) override; @@ -182,7 +198,9 @@ public: trace_ = trace; } - void heartbeat(const std::string& target_host, const Metadata& metadata, const HeartbeatRequest& request, + void heartbeat(const std::string& target_host, + const Metadata& metadata, + const HeartbeatRequest& request, std::chrono::milliseconds timeout, const std::function<void(const std::error_code&, const HeartbeatResponse&)>& cb) override; @@ -208,7 +226,7 @@ private: absl::Mutex clients_mtx_; absl::flat_hash_map<std::string, std::shared_ptr<RpcClient>> rpc_clients_ GUARDED_BY(rpc_clients_mtx_); - absl::Mutex rpc_clients_mtx_; // protects rpc_clients_ + absl::Mutex rpc_clients_mtx_; // protects rpc_clients_ std::uint32_t heartbeat_task_id_{0}; std::uint32_t stats_task_id_{0}; diff --git a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp index 61e6ed9..190b9c2 100644 --- a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp +++ b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp @@ -32,7 +32,6 @@ #include <utility> #include "ClientManagerFactory.h" -#include "HttpClientImpl.h" #include "InvocationContext.h" #include "LoggerImpl.h" #include "MessageExt.h" diff --git a/cpp/src/main/cpp/rocketmq/DynamicNameServerResolver.cpp b/cpp/src/main/cpp/rocketmq/DynamicNameServerResolver.cpp deleted file mode 100644 index f9981bc..0000000 --- a/cpp/src/main/cpp/rocketmq/DynamicNameServerResolver.cpp +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "DynamicNameServerResolver.h" - -#include <atomic> -#include <chrono> -#include <cstdint> -#include <functional> -#include <memory> - -#include "absl/strings/str_join.h" - -#include "LoggerImpl.h" -#include "SchedulerImpl.h" - -ROCKETMQ_NAMESPACE_BEGIN - -DynamicNameServerResolver::DynamicNameServerResolver(absl::string_view endpoint, - std::chrono::milliseconds refresh_interval) - : endpoint_(endpoint.data(), endpoint.length()), scheduler_(std::make_shared<SchedulerImpl>(1)), - refresh_interval_(refresh_interval) { - absl::string_view remains; - if (absl::StartsWith(endpoint_, "https://")) { - ssl_ = true; - remains = absl::StripPrefix(endpoint_, "https://"); - } else { - remains = absl::StripPrefix(endpoint_, "http://"); - } - - std::int32_t port = 80; - if (ssl_) { - port = 443; - } - - absl::string_view host; - if (absl::StrContains(remains, ':')) { - std::vector<absl::string_view> segments = absl::StrSplit(remains, ':'); - host = segments[0]; - remains = absl::StripPrefix(remains, host); - remains = absl::StripPrefix(remains, ":"); - - segments = absl::StrSplit(remains, '/'); - if (!absl::SimpleAtoi(segments[0], &port)) { - SPDLOG_WARN("Failed to parse port of name-server-list discovery service endpoint"); - abort(); - } - remains = absl::StripPrefix(remains, segments[0]); - } else { - std::vector<absl::string_view> segments = absl::StrSplit(remains, '/'); - host = segments[0]; - remains = absl::StripPrefix(remains, host); - } - - top_addressing_ = absl::make_unique<TopAddressing>(std::string(host.data(), host.length()), port, - std::string(remains.data(), remains.length())); -} - -std::string DynamicNameServerResolver::resolve() { - bool fetch_immediately = false; - { - absl::MutexLock lk(&name_server_list_mtx_); - if (name_server_list_.empty()) { - fetch_immediately = true; - } - } - - if (fetch_immediately) { - fetch(); - } - - { - absl::MutexLock lk(&name_server_list_mtx_); - return naming_scheme_.buildAddress(name_server_list_); - } -} - -void DynamicNameServerResolver::fetch() { - std::weak_ptr<DynamicNameServerResolver> ptr(shared_from_this()); - auto callback = [ptr](bool success, const std::vector<std::string>& name_server_list) { - if (success && !name_server_list.empty()) { - std::shared_ptr<DynamicNameServerResolver> resolver = ptr.lock(); - if (resolver) { - resolver->onNameServerListFetched(name_server_list); - } - } - }; - top_addressing_->fetchNameServerAddresses(callback); -} - -void DynamicNameServerResolver::onNameServerListFetched(const std::vector<std::string>& name_server_list) { - if (!name_server_list.empty()) { - absl::MutexLock lk(&name_server_list_mtx_); - if (name_server_list_ != name_server_list) { - SPDLOG_INFO("Name server list changed. {} --> {}", absl::StrJoin(name_server_list_, ";"), - absl::StrJoin(name_server_list, ";")); - name_server_list_ = name_server_list; - } - } -} - -void DynamicNameServerResolver::injectHttpClient(std::unique_ptr<HttpClient> http_client) { - top_addressing_->injectHttpClient(std::move(http_client)); -} - -void DynamicNameServerResolver::start() { - scheduler_->start(); - scheduler_->schedule(std::bind(&DynamicNameServerResolver::fetch, this), "DynamicNameServerResolver", - std::chrono::milliseconds(0), refresh_interval_); -} - -void DynamicNameServerResolver::shutdown() { - scheduler_->shutdown(); -} - -ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/rocketmq/Producer.cpp b/cpp/src/main/cpp/rocketmq/Producer.cpp index b2bc5e0..cf2b4da 100644 --- a/cpp/src/main/cpp/rocketmq/Producer.cpp +++ b/cpp/src/main/cpp/rocketmq/Producer.cpp @@ -21,7 +21,6 @@ #include <system_error> #include <utility> -#include "DynamicNameServerResolver.h" #include "LoggerImpl.h" #include "MixAll.h" #include "ProducerImpl.h" diff --git a/cpp/src/main/cpp/rocketmq/PushConsumer.cpp b/cpp/src/main/cpp/rocketmq/PushConsumer.cpp index daabf01..17ea8ca 100644 --- a/cpp/src/main/cpp/rocketmq/PushConsumer.cpp +++ b/cpp/src/main/cpp/rocketmq/PushConsumer.cpp @@ -17,7 +17,6 @@ #include <chrono> #include <memory> -#include "DynamicNameServerResolver.h" #include "PushConsumerImpl.h" #include "StaticNameServerResolver.h" #include "rocketmq/PushConsumer.h" diff --git a/cpp/src/main/cpp/rocketmq/include/DynamicNameServerResolver.h b/cpp/src/main/cpp/rocketmq/include/DynamicNameServerResolver.h deleted file mode 100644 index 5bc2c53..0000000 --- a/cpp/src/main/cpp/rocketmq/include/DynamicNameServerResolver.h +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include <atomic> -#include <chrono> -#include <cstdint> -#include <cstdlib> -#include <memory> - -#include "absl/base/thread_annotations.h" -#include "absl/memory/memory.h" -#include "absl/strings/numbers.h" -#include "absl/strings/str_split.h" -#include "absl/strings/string_view.h" -#include "absl/synchronization/mutex.h" - -#include "NameServerResolver.h" -#include "NamingScheme.h" -#include "Scheduler.h" -#include "TopAddressing.h" - -ROCKETMQ_NAMESPACE_BEGIN - -class DynamicNameServerResolver : public NameServerResolver, - public std::enable_shared_from_this<DynamicNameServerResolver> { -public: - DynamicNameServerResolver(absl::string_view endpoint, std::chrono::milliseconds refresh_interval); - - void start() override; - - void shutdown() override; - - std::string resolve() override LOCKS_EXCLUDED(name_server_list_mtx_); - - void injectHttpClient(std::unique_ptr<HttpClient> http_client); - -private: - std::string endpoint_; - - SchedulerSharedPtr scheduler_; - std::chrono::milliseconds refresh_interval_; - - void fetch(); - - void onNameServerListFetched(const std::vector<std::string>& name_server_list) LOCKS_EXCLUDED(name_server_list_mtx_); - - std::vector<std::string> name_server_list_ GUARDED_BY(name_server_list_mtx_); - absl::Mutex name_server_list_mtx_; - - std::atomic<std::uint32_t> index_{0}; - - bool ssl_{false}; - std::unique_ptr<TopAddressing> top_addressing_; - - NamingScheme naming_scheme_; -}; - -ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/third_party/cpp_httplib.BUILD b/cpp/third_party/cpp_httplib.BUILD deleted file mode 100644 index cf425f7..0000000 --- a/cpp/third_party/cpp_httplib.BUILD +++ /dev/null @@ -1,17 +0,0 @@ -load("@rules_cc//cc:defs.bzl", "cc_library") - -cc_library( - name = "cpp_httplib", - hdrs = [ - "httplib.h", - ], - visibility = [ - "//visibility:public", - ], - deps = [ - "//external:madler_zlib", - ], - defines = [ - "CPPHTTPLIB_ZLIB_SUPPORT", - ], -) \ No newline at end of file diff --git a/cpp/third_party/curl.BUILD b/cpp/third_party/curl.BUILD deleted file mode 100644 index 4b39657..0000000 --- a/cpp/third_party/curl.BUILD +++ /dev/null @@ -1,35 +0,0 @@ -load("@org_apache_rocketmq//bazel:curl.bzl", "CURL_COPTS") - -package(features = ["no_copts_tokenization"]) - -config_setting( - name = "windows", - values = {"cpu": "x64_windows"}, - visibility = ["//visibility:private"], -) - -config_setting( - name = "osx", - values = {"cpu": "darwin"}, - visibility = ["//visibility:private"], -) - -cc_library( - name = "curl", - srcs = glob([ - "lib/**/*.c", - ]), - hdrs = glob([ - "include/curl/*.h", - "lib/**/*.h", - ]), - copts = CURL_COPTS + [ - "-DOS=\"os\"", - "-DCURL_EXTERN_SYMBOL=__attribute__((__visibility__(\"default\")))", - ], - includes = [ - "include/", - "lib/", - ], - visibility = ["//visibility:public"], -)
