This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new bcb42862 Support timed connect for both bthread and pthread (#2524)
bcb42862 is described below
commit bcb42862a2d4a584e83afda1dc055d7d7caa6f7d
Author: Bright Chen <[email protected]>
AuthorDate: Mon Feb 26 10:46:03 2024 +0800
Support timed connect for both bthread and pthread (#2524)
---
src/bthread/fd.cpp | 125 +++++++++++++++++------------------
src/bthread/unstable.h | 4 ++
src/butil/endpoint.cpp | 146 +++++++++++++++++++++++++++++++++++++++--
src/butil/endpoint.h | 5 ++
src/butil/fd_utility.cpp | 5 ++
src/butil/fd_utility.h | 3 +
src/butil/memory/scope_guard.h | 88 +++++++++++++++++++++++++
test/bthread_fd_unittest.cpp | 39 +++++++++++
test/endpoint_unittest.cpp | 56 ++++++++++++++++
9 files changed, 402 insertions(+), 69 deletions(-)
diff --git a/src/bthread/fd.cpp b/src/bthread/fd.cpp
index f26cbd07..e97cee2c 100644
--- a/src/bthread/fd.cpp
+++ b/src/bthread/fd.cpp
@@ -31,10 +31,15 @@
#include "butil/fd_utility.h" // make_non_blocking
#include "butil/logging.h"
#include "butil/third_party/murmurhash3/murmurhash3.h" // fmix32
+#include "butil/memory/scope_guard.h"
#include "bthread/butex.h" // butex_*
#include "bthread/task_group.h" // TaskGroup
#include "bthread/bthread.h" //
bthread_start_urgent
+namespace butil {
+extern int pthread_fd_wait(int fd, unsigned events, const timespec* abstime);
+}
+
// Implement bthread functions on file descriptors
namespace bthread {
@@ -422,69 +427,10 @@ int stop_and_join_epoll_threads() {
return rc;
}
-#if defined(OS_LINUX)
-short epoll_to_poll_events(uint32_t epoll_events) {
- // Most POLL* and EPOLL* are same values.
- short poll_events = (epoll_events &
- (EPOLLIN | EPOLLPRI | EPOLLOUT |
- EPOLLRDNORM | EPOLLRDBAND |
- EPOLLWRNORM | EPOLLWRBAND |
- EPOLLMSG | EPOLLERR | EPOLLHUP));
- CHECK_EQ((uint32_t)poll_events, epoll_events);
- return poll_events;
-}
-#elif defined(OS_MACOSX)
-static short kqueue_to_poll_events(int kqueue_events) {
- //TODO: add more values?
- short poll_events = 0;
- if (kqueue_events == EVFILT_READ) {
- poll_events |= POLLIN;
- }
- if (kqueue_events == EVFILT_WRITE) {
- poll_events |= POLLOUT;
- }
- return poll_events;
-}
-#endif
-
// For pthreads.
int pthread_fd_wait(int fd, unsigned events,
const timespec* abstime) {
- int diff_ms = -1;
- if (abstime) {
- timespec now;
- clock_gettime(CLOCK_REALTIME, &now);
- int64_t now_us = butil::timespec_to_microseconds(now);
- int64_t abstime_us = butil::timespec_to_microseconds(*abstime);
- if (abstime_us <= now_us) {
- errno = ETIMEDOUT;
- return -1;
- }
- diff_ms = (abstime_us - now_us + 999L) / 1000L;
- }
-#if defined(OS_LINUX)
- const short poll_events = bthread::epoll_to_poll_events(events);
-#elif defined(OS_MACOSX)
- const short poll_events = bthread::kqueue_to_poll_events(events);
-#endif
- if (poll_events == 0) {
- errno = EINVAL;
- return -1;
- }
- pollfd ufds = { fd, poll_events, 0 };
- const int rc = poll(&ufds, 1, diff_ms);
- if (rc < 0) {
- return -1;
- }
- if (rc == 0) {
- errno = ETIMEDOUT;
- return -1;
- }
- if (ufds.revents & POLLNVAL) {
- errno = EBADF;
- return -1;
- }
- return 0;
+ return butil::pthread_fd_wait(fd, events, abstime);
}
} // namespace bthread
@@ -527,9 +473,19 @@ int bthread_connect(int sockfd, const sockaddr* serv_addr,
if (NULL == g || g->is_current_pthread_task()) {
return ::connect(sockfd, serv_addr, addrlen);
}
- // FIXME: Scoped non-blocking?
- butil::make_non_blocking(sockfd);
- const int rc = connect(sockfd, serv_addr, addrlen);
+
+ bool is_blocking = butil::is_blocking(sockfd);
+ if (is_blocking) {
+ butil::make_non_blocking(sockfd);
+ }
+ // Scoped non-blocking.
+ auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() {
+ if (is_blocking) {
+ butil::make_blocking(sockfd);
+ }
+ });
+
+ const int rc = ::connect(sockfd, serv_addr, addrlen);
if (rc == 0 || errno != EINPROGRESS) {
return rc;
}
@@ -554,6 +510,49 @@ int bthread_connect(int sockfd, const sockaddr* serv_addr,
return 0;
}
+int bthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
+ socklen_t addrlen, const timespec* abstime) {
+ if (!abstime) {
+ return bthread_connect(sockfd, serv_addr, addrlen);
+ }
+
+ bool is_blocking = butil::is_blocking(sockfd);
+ if (is_blocking) {
+ butil::make_non_blocking(sockfd);
+ }
+ // Scoped non-blocking.
+ auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() {
+ if (is_blocking) {
+ butil::make_blocking(sockfd);
+ }
+ });
+
+ const int rc = ::connect(sockfd, serv_addr, addrlen);
+ if (rc == 0 || errno != EINPROGRESS) {
+ return rc;
+ }
+#if defined(OS_LINUX)
+ if (bthread_fd_timedwait(sockfd, EPOLLOUT, abstime) < 0) {
+#elif defined(OS_MACOSX)
+ if (bthread_fd_timedwait(sockfd, EVFILT_WRITE, abstime) < 0) {
+#endif
+ return -1;
+ }
+
+ int err;
+ socklen_t errlen = sizeof(err);
+ if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
+ PLOG(FATAL) << "Fail to getsockopt";
+ return -1;
+ }
+ if (err != 0) {
+ CHECK(err != EINPROGRESS);
+ errno = err;
+ return -1;
+ }
+ return 0;
+}
+
// This does not wake pthreads calling bthread_fd_*wait.
int bthread_close(int fd) {
return bthread::get_epoll_thread(fd).fd_close(fd);
diff --git a/src/bthread/unstable.h b/src/bthread/unstable.h
index 5836f60d..5922cc2f 100644
--- a/src/bthread/unstable.h
+++ b/src/bthread/unstable.h
@@ -78,6 +78,10 @@ extern int bthread_close(int fd);
// Replacement of connect(2) in bthreads.
extern int bthread_connect(int sockfd, const struct sockaddr* serv_addr,
socklen_t addrlen);
+// Suspend caller thread until connect(2) on `sockfd' succeeds
+// or CLOCK_REALTIME reached `abstime' if `abstime' is not NULL.
+extern int bthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
+ socklen_t addrlen, const timespec* abstime);
// Add a startup function that each pthread worker will run at the beginning
// To run code at the end, use butil::thread_atexit()
diff --git a/src/butil/endpoint.cpp b/src/butil/endpoint.cpp
index a696aa42..ac1f958c 100644
--- a/src/butil/endpoint.cpp
+++ b/src/butil/endpoint.cpp
@@ -17,6 +17,7 @@
// Date: Mon. Nov 7 14:47:36 CST 2011
+#include "butil/compat.h"
#include <arpa/inet.h> // inet_pton, inet_ntop
#include <netdb.h> // gethostbyname_r
#include <unistd.h> // gethostname
@@ -28,12 +29,18 @@
#include <sys/socket.h> // SO_REUSEADDR SO_REUSEPORT
#include <memory>
#include <gflags/gflags.h>
+#include <sys/poll.h>
+#if defined(OS_MACOSX)
+#include <sys/event.h> // kevent(), kqueue()
+#endif
#include "butil/build_config.h" // OS_MACOSX
#include "butil/fd_guard.h" // fd_guard
#include "butil/endpoint.h" // ip_t
#include "butil/logging.h"
#include "butil/memory/singleton_on_pthread_once.h"
#include "butil/strings/string_piece.h"
+#include "butil/fd_utility.h"
+#include "butil/memory/scope_guard.h"
//supported since Linux 3.9.
DEFINE_bool(reuse_port, false, "Enable SO_REUSEPORT for all listened sockets");
@@ -47,6 +54,11 @@ int BAIDU_WEAK bthread_connect(
int sockfd, const struct sockaddr *serv_addr, socklen_t addrlen) {
return connect(sockfd, serv_addr, addrlen);
}
+
+int BAIDU_WEAK bthread_timed_connect(
+ int sockfd, const struct sockaddr* serv_addr,
+ socklen_t addrlen, const timespec* abstime);
+
__END_DECLS
#include "details/extended_endpoint.hpp"
@@ -380,10 +392,121 @@ int endpoint2hostname(const EndPoint& point,
std::string* host) {
return -1;
}
-int tcp_connect(EndPoint point, int* self_port) {
- struct sockaddr_storage serv_addr;
+#if defined(OS_LINUX)
+static short epoll_to_poll_events(uint32_t epoll_events) {
+ // Most POLL* and EPOLL* are same values.
+ short poll_events = (epoll_events &
+ (EPOLLIN | EPOLLPRI | EPOLLOUT |
+ EPOLLRDNORM | EPOLLRDBAND |
+ EPOLLWRNORM | EPOLLWRBAND |
+ EPOLLMSG | EPOLLERR | EPOLLHUP));
+ CHECK_EQ((uint32_t)poll_events, epoll_events);
+ return poll_events;
+}
+#elif defined(OS_MACOSX)
+short kqueue_to_poll_events(int kqueue_events) {
+ //TODO: add more values?
+ short poll_events = 0;
+ if (kqueue_events == EVFILT_READ) {
+ poll_events |= POLLIN;
+ }
+ if (kqueue_events == EVFILT_WRITE) {
+ poll_events |= POLLOUT;
+ }
+ return poll_events;
+}
+#endif
+
+int pthread_fd_wait(int fd, unsigned events,
+ const timespec* abstime) {
+ int diff_ms = -1;
+ if (abstime) {
+ timespec now;
+ clock_gettime(CLOCK_REALTIME, &now);
+ int64_t now_us = butil::timespec_to_microseconds(now);
+ int64_t abstime_us = butil::timespec_to_microseconds(*abstime);
+ if (abstime_us <= now_us) {
+ errno = ETIMEDOUT;
+ return -1;
+ }
+ diff_ms = (abstime_us - now_us + 999L) / 1000L;
+ }
+#if defined(OS_LINUX)
+ const short poll_events = epoll_to_poll_events(events);
+#elif defined(OS_MACOSX)
+ const short poll_events = kqueue_to_poll_events(events);
+#endif
+ if (poll_events == 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ pollfd ufds = { fd, poll_events, 0 };
+ const int rc = poll(&ufds, 1, diff_ms);
+ if (rc < 0) {
+ return -1;
+ }
+ if (rc == 0) {
+ errno = ETIMEDOUT;
+ return -1;
+ }
+ if (ufds.revents & POLLNVAL) {
+ errno = EBADF;
+ return -1;
+ }
+ return 0;
+}
+
+int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
+ socklen_t addrlen, const timespec* abstime) {
+ if (abstime == NULL) {
+ return ::connect(sockfd, serv_addr, addrlen);
+ }
+
+ bool is_blocking = butil::is_blocking(sockfd);
+ if (is_blocking) {
+ butil::make_non_blocking(sockfd);
+ }
+ // Scoped non-blocking.
+ auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() {
+ if (is_blocking) {
+ butil::make_blocking(sockfd);
+ }
+ });
+
+ const int rc = ::connect(sockfd, serv_addr, addrlen);
+ if (rc == 0 || errno != EINPROGRESS) {
+ return rc;
+ }
+#if defined(OS_LINUX)
+ if (pthread_fd_wait(sockfd, EPOLLOUT, abstime) < 0) {
+#elif defined(OS_MACOSX)
+ if (pthread_fd_wait(sockfd, EVFILT_WRITE, abstime) < 0) {
+#endif
+ return -1;
+ }
+
+ int err;
+ socklen_t errlen = sizeof(err);
+ if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
+ PLOG(FATAL) << "Fail to getsockopt";
+ return -1;
+ }
+ if (err != 0) {
+ CHECK(err != EINPROGRESS);
+ errno = err;
+ return -1;
+ }
+ return 0;
+}
+
+int tcp_connect(EndPoint server, int* self_port) {
+ return tcp_connect(server, self_port, -1);
+}
+
+int tcp_connect(const EndPoint& server, int* self_port, int
connect_timeout_ms) {
+ struct sockaddr_storage serv_addr{};
socklen_t serv_addr_size = 0;
- if (endpoint2sockaddr(point, &serv_addr, &serv_addr_size) != 0) {
+ if (endpoint2sockaddr(server, &serv_addr, &serv_addr_size) != 0) {
return -1;
}
fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
@@ -391,10 +514,21 @@ int tcp_connect(EndPoint point, int* self_port) {
return -1;
}
int rc = 0;
- if (bthread_connect != NULL) {
- rc = bthread_connect(sockfd, (struct sockaddr*) &serv_addr,
serv_addr_size);
+ if (connect_timeout_ms <= 0) {
+ if (bthread_connect != NULL) {
+ rc = bthread_connect(sockfd, (struct sockaddr*)&serv_addr,
serv_addr_size);
+ } else {
+ rc = ::connect(sockfd, (struct sockaddr*) &serv_addr,
serv_addr_size);
+ }
} else {
- rc = ::connect(sockfd, (struct sockaddr*) &serv_addr, serv_addr_size);
+ timespec abstime = butil::milliseconds_from_now(connect_timeout_ms);
+ if (bthread_timed_connect != NULL) {
+ rc = bthread_timed_connect(sockfd, (struct sockaddr*)&serv_addr,
+ serv_addr_size, &abstime);
+ } else {
+ rc = pthread_timed_connect(sockfd, (struct sockaddr*) &serv_addr,
+ serv_addr_size, &abstime);
+ }
}
if (rc < 0) {
return -1;
diff --git a/src/butil/endpoint.h b/src/butil/endpoint.h
index 3b9cdf44..c6d00bbb 100644
--- a/src/butil/endpoint.h
+++ b/src/butil/endpoint.h
@@ -130,6 +130,11 @@ int endpoint2hostname(const EndPoint& point, std::string*
host);
// into `self_port' if it's not NULL.
// Returns the socket descriptor, -1 otherwise and errno is set.
int tcp_connect(EndPoint server, int* self_port);
+// Suspend caller thread until connect(2) on `sockfd' succeeds
+// or CLOCK_REALTIME reached `abstime' if `abstime' is not NULL.
+// Write port of this side into `self_port' if it's not NULL.
+// Returns the socket descriptor, -1 otherwise and errno is set.
+int tcp_connect(const EndPoint& server, int* self_port, int
connect_timeout_ms);
// Create and listen to a TCP socket bound with `ip_and_port'.
// To enable SO_REUSEADDR for the whole program, enable gflag -reuse_addr
diff --git a/src/butil/fd_utility.cpp b/src/butil/fd_utility.cpp
index e1ca8cc7..45577769 100644
--- a/src/butil/fd_utility.cpp
+++ b/src/butil/fd_utility.cpp
@@ -25,6 +25,11 @@
namespace butil {
+bool is_blocking(int fd) {
+ const int flags = fcntl(fd, F_GETFL, 0);
+ return flags >= 0 && !(flags & O_NONBLOCK);
+}
+
int make_non_blocking(int fd) {
const int flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) {
diff --git a/src/butil/fd_utility.h b/src/butil/fd_utility.h
index 8b4f789e..8d93363d 100644
--- a/src/butil/fd_utility.h
+++ b/src/butil/fd_utility.h
@@ -24,6 +24,9 @@
namespace butil {
+// Returns true when fd is blocking, false otherwise.
+bool is_blocking(int fd);
+
// Make file descriptor |fd| non-blocking
// Returns 0 on success, -1 otherwise and errno is set (by fcntl)
int make_non_blocking(int fd);
diff --git a/src/butil/memory/scope_guard.h b/src/butil/memory/scope_guard.h
new file mode 100644
index 00000000..1771683f
--- /dev/null
+++ b/src/butil/memory/scope_guard.h
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_SCOPED_GUARD_H
+#define BRPC_SCOPED_GUARD_H
+
+#include <type_traits>
+
+namespace butil {
+
+// Whether a no-argument callable returns void.
+template<typename T>
+struct returns_void_t
+ : public std::is_same<void, decltype(std::declval<T&&>()())>
+{};
+
+template<typename Callback,
+ typename = typename std::enable_if<
+ returns_void_t<Callback>::value>::type>
+class ScopeGuard;
+
+template<typename Callback>
+ScopeGuard<Callback> MakeScopeGuard(Callback&& callback) noexcept;
+
+// ScopeGuard is a simple implementation to guarantee that
+// a function is executed upon leaving the current scope.
+template<typename Callback>
+class ScopeGuard<Callback> {
+public:
+ ScopeGuard(ScopeGuard&& other) noexcept
+ :_callback(std::move(other._callback))
+ , _dismiss(other._dismiss) {
+ other.dismiss();
+ }
+
+ ~ScopeGuard() noexcept {
+ if(!_dismiss) {
+ _callback();
+ }
+ }
+
+ void dismiss() noexcept {
+ _dismiss = true;
+ }
+
+ ScopeGuard() = delete;
+ ScopeGuard(const ScopeGuard&) = delete;
+ ScopeGuard& operator=(const ScopeGuard&) = delete;
+ ScopeGuard& operator=(ScopeGuard&&) = delete;
+
+private:
+// Only MakeScopeGuard and move constructor can create ScopeGuard.
+friend ScopeGuard<Callback> MakeScopeGuard<Callback>(Callback&& callback)
noexcept;
+
+ explicit ScopeGuard(Callback&& callback) noexcept
+ :_callback(std::forward<Callback>(callback))
+ , _dismiss(false) {}
+
+private:
+ Callback _callback;
+ bool _dismiss;
+};
+
+// The MakeScopeGuard() function is used to create a new ScopeGuard object.
+// It can be instantiated with a lambda function, a std::function<void()>,
+// a functor, or a void(*)() function pointer.
+template<typename Callback>
+ScopeGuard<Callback> MakeScopeGuard(Callback&& callback) noexcept {
+ return ScopeGuard<Callback>{ std::forward<Callback>(callback)};
+}
+
+}
+
+#endif // BRPC_SCOPED_GUARD_H
diff --git a/test/bthread_fd_unittest.cpp b/test/bthread_fd_unittest.cpp
index 5e3acb61..ec94f79f 100644
--- a/test/bthread_fd_unittest.cpp
+++ b/test/bthread_fd_unittest.cpp
@@ -26,6 +26,8 @@
#include "butil/time.h"
#include "butil/macros.h"
#include "butil/fd_utility.h"
+#include <butil/endpoint.h>
+#include <butil/fd_guard.h>
#include "butil/logging.h"
#include "bthread/task_control.h"
#include "bthread/task_group.h"
@@ -555,4 +557,41 @@ TEST(FDTest, double_close) {
ASSERT_EQ(-1, bthread_close(fds[1]));
ASSERT_EQ(ec, errno);
}
+
+const char* g_hostname = "baidu.com";
+TEST(FDTest, bthread_connect) {
+ butil::EndPoint ep;
+ ASSERT_EQ(0, butil::hostname2endpoint(g_hostname, 80, &ep));
+
+ {
+ struct sockaddr_storage serv_addr{};
+ socklen_t serv_addr_size = 0;
+ ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size));
+ butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
+ ASSERT_LE(0, sockfd);
+ bool is_blocking = butil::is_blocking(sockfd);
+ ASSERT_LE(0, sockfd);
+ ASSERT_EQ(0, bthread_connect(sockfd, (struct sockaddr*) &serv_addr,
serv_addr_size));
+ ASSERT_EQ(is_blocking, butil::is_blocking(sockfd));
+
+ }
+
+ {
+ struct sockaddr_storage serv_addr{};
+ socklen_t serv_addr_size = 0;
+ ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size));
+ butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
+ ASSERT_LE(0, sockfd);
+ bool is_blocking = butil::is_blocking(sockfd);
+ // In most cases, 1 millisecond will result in a connection timeout.
+ timespec abstime = butil::milliseconds_from_now(1);
+ const int rc = bthread_timed_connect(
+ sockfd, (struct sockaddr*) &serv_addr,
+ serv_addr_size, &abstime);
+ ASSERT_EQ(-1, rc);
+ ASSERT_EQ(ETIMEDOUT, errno);
+ ASSERT_EQ(is_blocking, butil::is_blocking(sockfd));
+ }
+}
+
} // namespace
diff --git a/test/endpoint_unittest.cpp b/test/endpoint_unittest.cpp
index 4cb7f757..fcb23a7b 100644
--- a/test/endpoint_unittest.cpp
+++ b/test/endpoint_unittest.cpp
@@ -16,12 +16,19 @@
// under the License.
#include <gtest/gtest.h>
+#include <butil/fd_guard.h>
+#include <butil/fd_utility.h>
#include "butil/errno.h"
#include "butil/endpoint.h"
#include "butil/logging.h"
#include "butil/containers/flat_map.h"
#include "butil/details/extended_endpoint.hpp"
+namespace butil {
+int pthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
+ socklen_t addrlen, const timespec* abstime);
+}
+
namespace {
using butil::details::ExtendedEndPoint;
@@ -472,4 +479,53 @@ TEST(EndPointTest, endpoint_concurrency) {
}
}
+const char* g_hostname = "baidu.com";
+
+TEST(EndPointTest, tcp_connect) {
+ butil::EndPoint ep;
+ ASSERT_EQ(0, butil::hostname2endpoint(g_hostname, 80, &ep));
+ {
+ butil::fd_guard sockfd(butil::tcp_connect(ep, NULL));
+ ASSERT_LE(0, sockfd);
+ }
+ {
+ butil::fd_guard sockfd(butil::tcp_connect(ep, NULL, 1000));
+ ASSERT_LE(0, sockfd);
+ }
+ {
+ butil::fd_guard sockfd(butil::tcp_connect(ep, NULL, 1));
+ ASSERT_EQ(-1, sockfd);
+ ASSERT_EQ(ETIMEDOUT, errno);
+ }
+
+ {
+ struct sockaddr_storage serv_addr{};
+ socklen_t serv_addr_size = 0;
+ ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size));
+ butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
+ ASSERT_LE(0, sockfd);
+ bool is_blocking = butil::is_blocking(sockfd);
+ ASSERT_EQ(0, butil::pthread_timed_connect(
+ sockfd, (struct sockaddr*) &serv_addr, serv_addr_size, NULL));
+ ASSERT_EQ(is_blocking, butil::is_blocking(sockfd));
+ }
+
+ {
+ struct sockaddr_storage serv_addr{};
+ socklen_t serv_addr_size = 0;
+ ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size));
+ butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
+ ASSERT_LE(0, sockfd);
+ bool is_blocking = butil::is_blocking(sockfd);
+ // In most cases, 1 millisecond will result in a connection timeout.
+ timespec abstime = butil::milliseconds_from_now(1);
+ const int rc = butil::pthread_timed_connect(
+ sockfd, (struct sockaddr*) &serv_addr,
+ serv_addr_size, &abstime);
+ ASSERT_EQ(-1, rc);
+ ASSERT_EQ(ETIMEDOUT, errno);
+ ASSERT_EQ(is_blocking, butil::is_blocking(sockfd));
+ }
+}
+
} // end of namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]