This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 90ea369 Depend on the independent Asio instead of Boost.Asio by
default (#382)
90ea369 is described below
commit 90ea3695b80660f837785d98e96047d90de3f64f
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Jan 8 02:13:49 2024 +0800
Depend on the independent Asio instead of Boost.Asio by default (#382)
Fixes https://github.com/apache/pulsar-client-cpp/issues/367
### Motivation
See the difference of Asio and Boost.Asio here:
https://think-async.com/Asio/AsioAndBoostAsio.html
Asio is updated more frequently than Boost.Asio and its release does not
need to be synchronous with other Boost components. Depending on the
independent Asio could make it easier for a newer Asio release.
### Modifications
Import `asio` 1.28.2 as the dependency and remove the `boost-asio`
dependency from the vcpkg.json. Since the latest Asio already removed
the `deadline_timer`, this patch replaces all `deadline_timer` with
`steady_timer`, which uses `std::chrono` rather than Boost.Date_Time
component to compute the timeout.
Add a `USE_ASIO` CMake option to determine whether Asio or Boost.Asio is
depended. For vcpkg users, the option is always enabled.
Finally, simplify the vcpkg.json by removing some `boost-*` dependencies
depended indirectly by the rest two major dependencies:
- boost-accumulators: latency percentiles computation
- boost-property-tree: JSON operations
These two dependencies are hard to remove for now unless introducing
other dependencies so they will be kept from some time.
---
.github/workflows/ci-build-binary-artifacts.yaml | 2 +
.github/workflows/ci-pr-validation.yaml | 2 +
CMakeLists.txt | 11 +
LegacyFindPackages.cmake | 4 -
lib/AckGroupingTrackerEnabled.cc | 6 +-
lib/AckGroupingTrackerEnabled.h | 3 +-
.../ProducerStatsDisabled.h => AsioDefines.h} | 25 +-
lib/{TimeUtils.cc => AsioTimer.h} | 19 +-
lib/Backoff.cc | 7 +-
lib/Backoff.h | 8 +-
lib/ClientConnection.cc | 254 +++++++++------------
lib/ClientConnection.h | 73 +++---
lib/CompressionCodec.cc | 1 -
lib/ConnectionPool.cc | 9 +-
lib/ConsumerImpl.cc | 19 +-
lib/ConsumerImplBase.cc | 4 +-
lib/ExecutorService.cc | 24 +-
lib/ExecutorService.h | 20 +-
lib/HandlerBase.cc | 9 +-
lib/HandlerBase.h | 10 +-
lib/Int64SerDes.h | 7 +-
lib/MultiTopicsConsumerImpl.cc | 10 +-
lib/MultiTopicsConsumerImpl.h | 3 +-
lib/NegativeAcksTracker.cc | 12 +-
lib/NegativeAcksTracker.h | 8 +-
lib/OpSendMsg.h | 6 +-
lib/PartitionedProducerImpl.cc | 8 +-
lib/PartitionedProducerImpl.h | 6 +-
lib/PatternMultiTopicsConsumerImpl.cc | 12 +-
lib/PatternMultiTopicsConsumerImpl.h | 5 +-
lib/PeriodicTask.cc | 8 +-
lib/PeriodicTask.h | 2 +-
lib/ProducerImpl.cc | 25 +-
lib/ProducerImpl.h | 15 +-
lib/RetryableOperation.h | 23 +-
lib/RoundRobinMessageRouter.cc | 5 +-
lib/RoundRobinMessageRouter.h | 6 +-
lib/SharedBuffer.h | 27 ++-
lib/TimeUtils.h | 16 +-
lib/UnAckedMessageTrackerEnabled.cc | 6 +-
lib/UnAckedMessageTrackerEnabled.h | 3 +-
lib/auth/athenz/ZTSClient.cc | 2 -
lib/stats/ConsumerStatsImpl.cc | 6 +-
lib/stats/ConsumerStatsImpl.h | 5 +-
lib/stats/ProducerStatsBase.h | 4 +-
lib/stats/ProducerStatsDisabled.h | 2 +-
lib/stats/ProducerStatsImpl.cc | 15 +-
lib/stats/ProducerStatsImpl.h | 8 +-
tests/AuthPluginTest.cc | 12 +-
tests/AuthTokenTest.cc | 1 -
tests/BackoffTest.cc | 26 +--
tests/ConsumerTest.cc | 2 +-
tests/PulsarFriend.h | 2 +-
tests/RoundRobinMessageRouterTest.cc | 16 +-
vcpkg.json | 37 +--
55 files changed, 422 insertions(+), 439 deletions(-)
diff --git a/.github/workflows/ci-build-binary-artifacts.yaml
b/.github/workflows/ci-build-binary-artifacts.yaml
index a2b7be8..63644e5 100644
--- a/.github/workflows/ci-build-binary-artifacts.yaml
+++ b/.github/workflows/ci-build-binary-artifacts.yaml
@@ -148,6 +148,7 @@ jobs:
mkdir -p $BUILD_DIR
cmake -B $BUILD_DIR \
-G "${{ matrix.generator }}" ${{ matrix.arch }} \
+ -DUSE_ASIO=ON \
-DBUILD_TESTS=OFF \
-DVCPKG_TRIPLET=${{ matrix.triplet }} \
-DCMAKE_INSTALL_PREFIX=${{ env.INSTALL_DIR }} \
@@ -174,6 +175,7 @@ jobs:
mkdir -p $BUILD_DIR
cmake -B $BUILD_DIR \
-G "${{ matrix.generator }}" ${{ matrix.arch }} \
+ -DUSE_ASIO=ON \
-DBUILD_TESTS=OFF \
-DVCPKG_TRIPLET=${{ matrix.triplet }} \
-DCMAKE_INSTALL_PREFIX=$INSTALL_DIR_DEBUG \
diff --git a/.github/workflows/ci-pr-validation.yaml
b/.github/workflows/ci-pr-validation.yaml
index 50a5c80..56309e9 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -191,6 +191,7 @@ jobs:
cmake \
-B ./build-1 \
-G "${{ matrix.generator }}" ${{ matrix.arch }} \
+ -DUSE_ASIO=ON \
-DBUILD_TESTS=OFF \
-DVCPKG_TRIPLET="${{ matrix.triplet }}" \
-DCMAKE_INSTALL_PREFIX="${{ env.INSTALL_DIR }}" \
@@ -232,6 +233,7 @@ jobs:
cmake \
-B ./build-2 \
-G "${{ matrix.generator }}" ${{ matrix.arch }} \
+ -DUSE_ASIO=ON \
-DBUILD_TESTS=OFF \
-DVCPKG_TRIPLET="${{ matrix.triplet }}" \
-DCMAKE_INSTALL_PREFIX="${{ env.INSTALL_DIR }}" \
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5bde2b7..662e84a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -19,8 +19,11 @@
cmake_minimum_required(VERSION 3.13)
+option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
+
option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF)
if (INTEGRATE_VCPKG)
+ set(USE_ASIO ON)
set(CMAKE_TOOLCHAIN_FILE
"${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")
endif ()
@@ -129,6 +132,10 @@ if (INTEGRATE_VCPKG)
$<IF:$<TARGET_EXISTS:zstd::libzstd_shared>,zstd::libzstd_shared,zstd::libzstd_static>
Snappy::snappy
)
+ if (USE_ASIO)
+ find_package(asio CONFIG REQUIRED)
+ set(COMMON_LIBS ${COMMON_LIBS} asio::asio)
+ endif ()
add_definitions(-DHAS_ZSTD -DHAS_SNAPPY)
if (MSVC)
find_package(dlfcn-win32 CONFIG REQUIRED)
@@ -140,6 +147,10 @@ else ()
include(./LegacyFindPackages.cmake)
endif ()
+if (USE_ASIO)
+ add_definitions(-DUSE_ASIO)
+endif ()
+
set(LIB_NAME $ENV{PULSAR_LIBRARY_NAME})
if (NOT LIB_NAME)
set(LIB_NAME pulsar)
diff --git a/LegacyFindPackages.cmake b/LegacyFindPackages.cmake
index 10c9fa7..5004545 100644
--- a/LegacyFindPackages.cmake
+++ b/LegacyFindPackages.cmake
@@ -176,10 +176,6 @@ if (Boost_MAJOR_VERSION EQUAL 1 AND Boost_MINOR_VERSION
LESS 69)
MESSAGE(STATUS "Linking with Boost:System")
endif()
-if (MSVC)
- set(BOOST_COMPONENTS ${BOOST_COMPONENTS} date_time)
-endif()
-
if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.9)
# GCC 4.8.2 implementation of std::regex is buggy
set(BOOST_COMPONENTS ${BOOST_COMPONENTS} regex)
diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc
index d90cc87..7233b2c 100644
--- a/lib/AckGroupingTrackerEnabled.cc
+++ b/lib/AckGroupingTrackerEnabled.cc
@@ -117,7 +117,7 @@ void AckGroupingTrackerEnabled::close() {
this->flush();
std::lock_guard<std::mutex> lock(this->mutexTimer_);
if (this->timer_) {
- boost::system::error_code ec;
+ ASIO_ERROR ec;
this->timer_->cancel(ec);
}
}
@@ -168,9 +168,9 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
std::lock_guard<std::mutex> lock(this->mutexTimer_);
this->timer_ = this->executor_->createDeadlineTimer();
-
this->timer_->expires_from_now(boost::posix_time::milliseconds(std::max(1L,
this->ackGroupingTimeMs_)));
+ this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L,
this->ackGroupingTimeMs_)));
auto self = shared_from_this();
- this->timer_->async_wait([this, self](const boost::system::error_code& ec)
-> void {
+ this->timer_->async_wait([this, self](const ASIO_ERROR& ec) -> void {
if (!ec) {
this->flush();
this->scheduleTimer();
diff --git a/lib/AckGroupingTrackerEnabled.h b/lib/AckGroupingTrackerEnabled.h
index ec1d66b..b04f405 100644
--- a/lib/AckGroupingTrackerEnabled.h
+++ b/lib/AckGroupingTrackerEnabled.h
@@ -22,18 +22,17 @@
#include <pulsar/MessageId.h>
#include <atomic>
-#include <boost/asio/deadline_timer.hpp>
#include <cstdint>
#include <mutex>
#include <set>
#include "AckGroupingTracker.h"
+#include "AsioTimer.h"
namespace pulsar {
class ClientImpl;
using ClientImplPtr = std::shared_ptr<ClientImpl>;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
class HandlerBase;
diff --git a/lib/stats/ProducerStatsDisabled.h b/lib/AsioDefines.h
similarity index 64%
copy from lib/stats/ProducerStatsDisabled.h
copy to lib/AsioDefines.h
index df1df0f..2e89812 100644
--- a/lib/stats/ProducerStatsDisabled.h
+++ b/lib/AsioDefines.h
@@ -16,16 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
+// This header defines common macros to use Asio or Boost.Asio.
+#pragma once
-#ifndef PULSAR_PRODUCER_STATS_DISABLED_HEADER
-#define PULSAR_PRODUCER_STATS_DISABLED_HEADER
-#include "ProducerStatsBase.h"
-
-namespace pulsar {
-class ProducerStatsDisabled : public ProducerStatsBase {
- public:
- virtual void messageSent(const Message& msg){};
- virtual void messageReceived(Result, const boost::posix_time::ptime&){};
-};
-} // namespace pulsar
-#endif // PULSAR_PRODUCER_STATS_DISABLED_HEADER
+#ifdef USE_ASIO
+#define ASIO ::asio
+#define ASIO_ERROR asio::error_code
+#define ASIO_SUCCESS (ASIO_ERROR{})
+#define ASIO_SYSTEM_ERROR asio::system_error
+#else
+#define ASIO boost::asio
+#define ASIO_ERROR boost::system::error_code
+#define ASIO_SUCCESS
boost::system::errc::make_error_code(boost::system::errc::success)
+#define ASIO_SYSTEM_ERROR boost::system::system_error
+#endif
diff --git a/lib/TimeUtils.cc b/lib/AsioTimer.h
similarity index 71%
rename from lib/TimeUtils.cc
rename to lib/AsioTimer.h
index 7eecb86..d0c3de5 100644
--- a/lib/TimeUtils.cc
+++ b/lib/AsioTimer.h
@@ -16,17 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
+#pragma once
-#include "TimeUtils.h"
+#ifdef USE_ASIO
+#include <asio/steady_timer.hpp>
+#else
+#include <boost/asio/steady_timer.hpp>
+#endif
-namespace pulsar {
+#include <memory>
-ptime TimeUtils::now() { return microsec_clock::universal_time(); }
+#include "AsioDefines.h"
-int64_t TimeUtils::currentTimeMillis() {
- static ptime time_t_epoch(boost::gregorian::date(1970, 1, 1));
-
- time_duration diff = now() - time_t_epoch;
- return diff.total_milliseconds();
-}
-} // namespace pulsar
\ No newline at end of file
+using DeadlineTimerPtr = std::shared_ptr<ASIO::steady_timer>;
diff --git a/lib/Backoff.cc b/lib/Backoff.cc
index fdb1359..e2c43d1 100644
--- a/lib/Backoff.cc
+++ b/lib/Backoff.cc
@@ -21,6 +21,9 @@
#include <time.h> /* time */
#include <algorithm>
+#include <chrono>
+
+#include "TimeUtils.h"
namespace pulsar {
@@ -33,8 +36,8 @@ TimeDuration Backoff::next() {
// Check for mandatory stop
if (!mandatoryStopMade_) {
- const boost::posix_time::ptime& now =
boost::posix_time::microsec_clock::universal_time();
- TimeDuration timeElapsedSinceFirstBackoff =
boost::posix_time::milliseconds(0);
+ auto now = TimeUtils::now();
+ TimeDuration timeElapsedSinceFirstBackoff =
std::chrono::nanoseconds(0);
if (initial_ == current) {
firstBackoffTime_ = now;
} else {
diff --git a/lib/Backoff.h b/lib/Backoff.h
index a59c00f..d9d7fae 100644
--- a/lib/Backoff.h
+++ b/lib/Backoff.h
@@ -20,12 +20,12 @@
#define _PULSAR_BACKOFF_HEADER_
#include <pulsar/defines.h>
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include <chrono>
#include <random>
-namespace pulsar {
+#include "TimeUtils.h"
-using TimeDuration = boost::posix_time::time_duration;
+namespace pulsar {
class PULSAR_PUBLIC Backoff {
public:
@@ -38,7 +38,7 @@ class PULSAR_PUBLIC Backoff {
const TimeDuration max_;
TimeDuration next_;
TimeDuration mandatoryStop_;
- boost::posix_time::ptime firstBackoffTime_;
+ decltype(std::chrono::high_resolution_clock::now()) firstBackoffTime_;
std::mt19937 rng_;
bool mandatoryStopMade_ = false;
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 61aa7f7..82ab492 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -23,6 +23,7 @@
#include <boost/optional.hpp>
#include <fstream>
+#include "AsioDefines.h"
#include "Commands.h"
#include "ConnectionPool.h"
#include "ConsumerImpl.h"
@@ -39,7 +40,7 @@
DECLARE_LOG_OBJECT()
-using namespace boost::asio::ip;
+using namespace ASIO::ip;
namespace pulsar {
@@ -162,19 +163,13 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
const ClientConfiguration&
clientConfiguration,
const AuthenticationPtr& authentication,
const std::string& clientVersion,
ConnectionPool& pool, size_t poolIndex)
- :
operationsTimeout_(seconds(clientConfiguration.getOperationTimeoutSeconds())),
+ :
operationsTimeout_(std::chrono::seconds(clientConfiguration.getOperationTimeoutSeconds())),
authentication_(authentication),
serverProtocolVersion_(proto::ProtocolVersion_MIN),
executor_(executor),
resolver_(executor_->createTcpResolver()),
socket_(executor_->createSocket()),
-#if BOOST_VERSION >= 107000
-
strand_(boost::asio::make_strand(executor_->getIOService().get_executor())),
-#elif BOOST_VERSION >= 106600
- strand_(executor_->getIOService().get_executor()),
-#else
- strand_(executor_->getIOService()),
-#endif
+ strand_(ASIO::make_strand(executor_->getIOService().get_executor())),
logicalAddress_(logicalAddress),
physicalAddress_(physicalAddress),
cnxString_("[<none> -> " + physicalAddress + "] "),
@@ -203,11 +198,7 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
}
if (clientConfiguration.isUseTls()) {
-#if BOOST_VERSION >= 105400
- boost::asio::ssl::context
ctx(boost::asio::ssl::context::tlsv12_client);
-#else
- boost::asio::ssl::context ctx(executor_->getIOService(),
boost::asio::ssl::context::tlsv1_client);
-#endif
+ ASIO::ssl::context ctx(ASIO::ssl::context::tlsv12_client);
Url serviceUrl;
Url proxyUrl;
Url::parse(physicalAddress, serviceUrl);
@@ -219,10 +210,10 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
LOG_INFO("Configuring SNI Proxy-url=" << proxyServiceUrl_);
}
if (clientConfiguration.isTlsAllowInsecureConnection()) {
- ctx.set_verify_mode(boost::asio::ssl::context::verify_none);
+ ctx.set_verify_mode(ASIO::ssl::context::verify_none);
isTlsAllowInsecureConnection_ = true;
} else {
- ctx.set_verify_mode(boost::asio::ssl::context::verify_peer);
+ ctx.set_verify_mode(ASIO::ssl::context::verify_peer);
std::string trustCertFilePath =
clientConfiguration.getTlsTrustCertsFilePath();
if (!trustCertFilePath.empty()) {
@@ -252,12 +243,12 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
throw ResultAuthenticationError;
}
- ctx.use_private_key_file(tlsPrivateKey,
boost::asio::ssl::context::pem);
- ctx.use_certificate_file(tlsCertificates,
boost::asio::ssl::context::pem);
+ ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem);
+ ctx.use_certificate_file(tlsCertificates, ASIO::ssl::context::pem);
} else {
if (file_exists(tlsPrivateKey) && file_exists(tlsCertificates)) {
- ctx.use_private_key_file(tlsPrivateKey,
boost::asio::ssl::context::pem);
- ctx.use_certificate_file(tlsCertificates,
boost::asio::ssl::context::pem);
+ ctx.use_private_key_file(tlsPrivateKey,
ASIO::ssl::context::pem);
+ ctx.use_certificate_file(tlsCertificates,
ASIO::ssl::context::pem);
}
}
@@ -266,14 +257,13 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
if (!clientConfiguration.isTlsAllowInsecureConnection() &&
clientConfiguration.isValidateHostName()) {
LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":"
<< serviceUrl.port());
std::string urlHost = isSniProxy_ ? proxyUrl.host() :
serviceUrl.host();
-
tlsSocket_->set_verify_callback(boost::asio::ssl::rfc2818_verification(urlHost));
+
tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost));
}
LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
if (!SSL_set_tlsext_host_name(tlsSocket_->native_handle(),
serviceUrl.host().c_str())) {
- boost::system::error_code ec{static_cast<int>(::ERR_get_error()),
-
boost::asio::error::get_ssl_category()};
- LOG_ERROR(boost::system::system_error{ec}.what() << ": Error while
setting TLS SNI");
+ ASIO_ERROR ec{static_cast<int>(::ERR_get_error()),
ASIO::error::get_ssl_category()};
+ LOG_ERROR(ec.message() << ": Error while setting TLS SNI");
return;
}
}
@@ -310,9 +300,9 @@ void ClientConnection::handlePulsarConnected(const
proto::CommandConnected& cmdC
// Only send keep-alive probes if the broker supports it
keepAliveTimer_ = executor_->createDeadlineTimer();
if (keepAliveTimer_) {
-
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+
keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds));
auto weakSelf = weak_from_this();
- keepAliveTimer_->async_wait([weakSelf](const
boost::system::error_code&) {
+ keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
auto self = weakSelf.lock();
if (self) {
self->handleKeepAliveTimeout();
@@ -357,13 +347,12 @@ void
ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
if (consumerStatsRequestTimer_) {
consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
auto weakSelf = weak_from_this();
- consumerStatsRequestTimer_->async_wait(
- [weakSelf, consumerStatsRequests](const boost::system::error_code&
err) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleConsumerStatsTimeout(err,
consumerStatsRequests);
- }
- });
+ consumerStatsRequestTimer_->async_wait([weakSelf,
consumerStatsRequests](const ASIO_ERROR& err) {
+ auto self = weakSelf.lock();
+ if (self) {
+ self->handleConsumerStatsTimeout(err, consumerStatsRequests);
+ }
+ });
}
lock.unlock();
// Complex logic since promises need to be fulfilled outside the lock
@@ -375,19 +364,19 @@ void
ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
/// The number of unacknowledged probes to send before considering the
connection dead and notifying the
/// application layer
-typedef boost::asio::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPCNT>
tcp_keep_alive_count;
+typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPCNT>
tcp_keep_alive_count;
/// The interval between subsequential keepalive probes, regardless of what
the connection has exchanged in
/// the meantime
-typedef boost::asio::detail::socket_option::integer<IPPROTO_TCP,
TCP_KEEPINTVL> tcp_keep_alive_interval;
+typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPINTVL>
tcp_keep_alive_interval;
/// The interval between the last data packet sent (simple ACKs are not
considered data) and the first
/// keepalive
/// probe; after the connection is marked to need keepalive, this counter is
not used any further
#ifdef __APPLE__
-typedef boost::asio::detail::socket_option::integer<IPPROTO_TCP,
TCP_KEEPALIVE> tcp_keep_alive_idle;
+typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPALIVE>
tcp_keep_alive_idle;
#else
-typedef boost::asio::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE>
tcp_keep_alive_idle;
+typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE>
tcp_keep_alive_idle;
#endif
/*
@@ -396,15 +385,14 @@ typedef
boost::asio::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> t
* if async_connect without any error, connected_ would be set to true
* at this point the connection is deemed valid to be used by clients of this
class
*/
-void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
- tcp::resolver::iterator
endpointIterator) {
+void ClientConnection::handleTcpConnected(const ASIO_ERROR& err,
tcp::resolver::iterator endpointIterator) {
if (!err) {
std::stringstream cnxStringStream;
try {
cnxStringStream << "[" << socket_->local_endpoint() << " -> " <<
socket_->remote_endpoint()
<< "] ";
cnxString_ = cnxStringStream.str();
- } catch (const boost::system::system_error& e) {
+ } catch (const ASIO_SYSTEM_ERROR& e) {
LOG_ERROR("Failed to get endpoint: " << e.what());
close(ResultRetryable);
return;
@@ -424,7 +412,7 @@ void ClientConnection::handleTcpConnected(const
boost::system::error_code& err,
state_ = TcpConnected;
lock.unlock();
- boost::system::error_code error;
+ ASIO_ERROR error;
socket_->set_option(tcp::no_delay(true), error);
if (error) {
LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " <<
error.message());
@@ -457,7 +445,7 @@ void ClientConnection::handleTcpConnected(const
boost::system::error_code& err,
if (tlsSocket_) {
if (!isTlsAllowInsecureConnection_) {
- boost::system::error_code err;
+ ASIO_ERROR err;
Url service_url;
if (!Url::parse(physicalAddress_, service_url)) {
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: "
<< err << " " << err.message());
@@ -466,26 +454,21 @@ void ClientConnection::handleTcpConnected(const
boost::system::error_code& err,
}
}
auto weakSelf = weak_from_this();
- auto callback = [weakSelf](const boost::system::error_code& err) {
+ auto callback = [weakSelf](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
if (self) {
self->handleHandshake(err);
}
};
-#if BOOST_VERSION >= 106600
-
tlsSocket_->async_handshake(boost::asio::ssl::stream<tcp::socket>::client,
- boost::asio::bind_executor(strand_,
callback));
-#else
-
tlsSocket_->async_handshake(boost::asio::ssl::stream<tcp::socket>::client,
- strand_.wrap(callback));
-#endif
+ tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
+ ASIO::bind_executor(strand_,
callback));
} else {
-
handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success));
+ handleHandshake(ASIO_SUCCESS);
}
} else if (endpointIterator != tcp::resolver::iterator()) {
LOG_WARN(cnxString_ << "Failed to establish connection: " <<
err.message());
// The connection failed. Try the next endpoint in the list.
- boost::system::error_code closeError;
+ ASIO_ERROR closeError;
socket_->close(closeError); // ignore the error of close
if (closeError) {
LOG_WARN(cnxString_ << "Failed to close socket: " <<
err.message());
@@ -497,15 +480,14 @@ void ClientConnection::handleTcpConnected(const
boost::system::error_code& err,
connectTimeoutTask_->start();
tcp::endpoint endpoint = *endpointIterator;
auto weakSelf = weak_from_this();
- socket_->async_connect(endpoint,
- [weakSelf, endpointIterator](const
boost::system::error_code& err) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleTcpConnected(err,
endpointIterator);
- }
- });
+ socket_->async_connect(endpoint, [weakSelf,
endpointIterator](const ASIO_ERROR& err) {
+ auto self = weakSelf.lock();
+ if (self) {
+ self->handleTcpConnected(err, endpointIterator);
+ }
+ });
} else {
- if (err == boost::asio::error::operation_aborted) {
+ if (err == ASIO::error::operation_aborted) {
// TCP connect timeout, which is not retryable
close();
} else {
@@ -518,7 +500,7 @@ void ClientConnection::handleTcpConnected(const
boost::system::error_code& err,
}
}
-void ClientConnection::handleHandshake(const boost::system::error_code& err) {
+void ClientConnection::handleHandshake(const ASIO_ERROR& err) {
if (err) {
LOG_ERROR(cnxString_ << "Handshake failed: " << err.message());
close();
@@ -537,13 +519,12 @@ void ClientConnection::handleHandshake(const
boost::system::error_code& err) {
// Send CONNECT command to broker
auto self = shared_from_this();
asyncWrite(buffer.const_asio_buffer(),
- customAllocWriteHandler([this, self, buffer](const
boost::system::error_code& err, size_t) {
+ customAllocWriteHandler([this, self, buffer](const ASIO_ERROR&
err, size_t) {
handleSentPulsarConnect(err, buffer);
}));
}
-void ClientConnection::handleSentPulsarConnect(const
boost::system::error_code& err,
- const SharedBuffer& buffer) {
+void ClientConnection::handleSentPulsarConnect(const ASIO_ERROR& err, const
SharedBuffer& buffer) {
if (isClosed()) {
return;
}
@@ -557,8 +538,7 @@ void ClientConnection::handleSentPulsarConnect(const
boost::system::error_code&
readNextCommand();
}
-void ClientConnection::handleSentAuthResponse(const boost::system::error_code&
err,
- const SharedBuffer& buffer) {
+void ClientConnection::handleSentAuthResponse(const ASIO_ERROR& err, const
SharedBuffer& buffer) {
if (isClosed()) {
return;
}
@@ -580,7 +560,7 @@ void ClientConnection::tcpConnectAsync() {
return;
}
- boost::system::error_code err;
+ ASIO_ERROR err;
Url service_url;
std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
if (!Url::parse(hostUrl, service_url)) {
@@ -599,17 +579,15 @@ void ClientConnection::tcpConnectAsync() {
LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" <<
service_url.port());
tcp::resolver::query query(service_url.host(),
std::to_string(service_url.port()));
auto weakSelf = weak_from_this();
- resolver_->async_resolve(
- query, [weakSelf](const boost::system::error_code& err,
tcp::resolver::iterator iterator) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleResolve(err, iterator);
- }
- });
+ resolver_->async_resolve(query, [weakSelf](const ASIO_ERROR& err,
tcp::resolver::iterator iterator) {
+ auto self = weakSelf.lock();
+ if (self) {
+ self->handleResolve(err, iterator);
+ }
+ });
}
-void ClientConnection::handleResolve(const boost::system::error_code& err,
- tcp::resolver::iterator endpointIterator)
{
+void ClientConnection::handleResolve(const ASIO_ERROR& err,
tcp::resolver::iterator endpointIterator) {
if (err) {
std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
LOG_ERROR(hostUrl << "Resolve error: " << err << " : " <<
err.message());
@@ -642,13 +620,12 @@ void ClientConnection::handleResolve(const
boost::system::error_code& err,
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Resolved hostname " <<
endpointIterator->host_name() //
<< " to " << endpointIterator->endpoint());
- socket_->async_connect(*endpointIterator,
- [weakSelf, endpointIterator](const
boost::system::error_code& err) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleTcpConnected(err,
endpointIterator);
- }
- });
+ socket_->async_connect(*endpointIterator, [weakSelf,
endpointIterator](const ASIO_ERROR& err) {
+ auto self = weakSelf.lock();
+ if (self) {
+ self->handleTcpConnected(err, endpointIterator);
+ }
+ });
} else {
LOG_WARN(cnxString_ << "No IP address found");
close();
@@ -659,15 +636,13 @@ void ClientConnection::handleResolve(const
boost::system::error_code& err,
void ClientConnection::readNextCommand() {
const static uint32_t minReadSize = sizeof(uint32_t);
auto self = shared_from_this();
- asyncReceive(
- incomingBuffer_.asio_buffer(),
- customAllocReadHandler([this, self](const boost::system::error_code&
err, size_t bytesTransferred) {
- handleRead(err, bytesTransferred, minReadSize);
- }));
+ asyncReceive(incomingBuffer_.asio_buffer(),
+ customAllocReadHandler([this, self](const ASIO_ERROR& err,
size_t bytesTransferred) {
+ handleRead(err, bytesTransferred, minReadSize);
+ }));
}
-void ClientConnection::handleRead(const boost::system::error_code& err, size_t
bytesTransferred,
- uint32_t minReadSize) {
+void ClientConnection::handleRead(const ASIO_ERROR& err, size_t
bytesTransferred, uint32_t minReadSize) {
if (isClosed()) {
return;
}
@@ -675,9 +650,9 @@ void ClientConnection::handleRead(const
boost::system::error_code& err, size_t b
incomingBuffer_.bytesWritten(bytesTransferred);
if (err || bytesTransferred == 0) {
- if (err == boost::asio::error::operation_aborted) {
+ if (err == ASIO::error::operation_aborted) {
LOG_DEBUG(cnxString_ << "Read operation was canceled: " <<
err.message());
- } else if (bytesTransferred == 0 || err == boost::asio::error::eof) {
+ } else if (bytesTransferred == 0 || err == ASIO::error::eof) {
LOG_DEBUG(cnxString_ << "Server closed the connection: " <<
err.message());
} else {
LOG_ERROR(cnxString_ << "Read operation failed: " <<
err.message());
@@ -689,11 +664,11 @@ void ClientConnection::handleRead(const
boost::system::error_code& err, size_t b
SharedBuffer buffer = incomingBuffer_.slice(bytesTransferred);
auto self = shared_from_this();
auto nextMinReadSize = minReadSize - bytesTransferred;
- asyncReceive(buffer.asio_buffer(), customAllocReadHandler([this, self,
nextMinReadSize](
- const
boost::system::error_code& err,
- size_t
bytesTransferred) {
- handleRead(err, bytesTransferred, nextMinReadSize);
- }));
+ asyncReceive(buffer.asio_buffer(),
+ customAllocReadHandler(
+ [this, self, nextMinReadSize](const ASIO_ERROR& err,
size_t bytesTransferred) {
+ handleRead(err, bytesTransferred,
nextMinReadSize);
+ }));
} else {
processIncomingBuffer();
}
@@ -720,12 +695,11 @@ void ClientConnection::processIncomingBuffer() {
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_,
newBufferSize);
}
auto self = shared_from_this();
- asyncReceive(
- incomingBuffer_.asio_buffer(),
- customAllocReadHandler([this, self, bytesToReceive](const
boost::system::error_code& err,
- size_t
bytesTransferred) {
- handleRead(err, bytesTransferred, bytesToReceive);
- }));
+ asyncReceive(incomingBuffer_.asio_buffer(),
+ customAllocReadHandler(
+ [this, self, bytesToReceive](const ASIO_ERROR&
err, size_t bytesTransferred) {
+ handleRead(err, bytesTransferred,
bytesToReceive);
+ }));
return;
}
@@ -803,11 +777,11 @@ void ClientConnection::processIncomingBuffer() {
uint32_t minReadSize = sizeof(uint32_t) -
incomingBuffer_.readableBytes();
auto self = shared_from_this();
- asyncReceive(incomingBuffer_.asio_buffer(),
- customAllocReadHandler([this, self, minReadSize](const
boost::system::error_code& err,
- size_t
bytesTransferred) {
- handleRead(err, bytesTransferred, minReadSize);
- }));
+ asyncReceive(
+ incomingBuffer_.asio_buffer(),
+ customAllocReadHandler([this, self, minReadSize](const ASIO_ERROR&
err, size_t bytesTransferred) {
+ handleRead(err, bytesTransferred, minReadSize);
+ }));
return;
}
@@ -1056,7 +1030,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd,
const uint64_t request
requestData.timer = executor_->createDeadlineTimer();
requestData.timer->expires_from_now(operationsTimeout_);
auto weakSelf = weak_from_this();
- requestData.timer->async_wait([weakSelf, requestData](const
boost::system::error_code& ec) {
+ requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR&
ec) {
auto self = weakSelf.lock();
if (self) {
self->handleLookupTimeout(ec, requestData);
@@ -1082,11 +1056,7 @@ void ClientConnection::sendCommand(const SharedBuffer&
cmd) {
self->sendCommandInternal(cmd);
}
};
-#if BOOST_VERSION >= 106600
- boost::asio::post(strand_, callback);
-#else
- strand_.post(callback);
-#endif
+ ASIO::post(strand_, callback);
} else {
sendCommandInternal(cmd);
}
@@ -1099,8 +1069,9 @@ void ClientConnection::sendCommand(const SharedBuffer&
cmd) {
void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) {
auto self = shared_from_this();
asyncWrite(cmd.const_asio_buffer(),
- customAllocWriteHandler([this, self, cmd](const
boost::system::error_code& err,
- size_t
bytesTransferred) { handleSend(err, cmd); }));
+ customAllocWriteHandler([this, self, cmd](const ASIO_ERROR&
err, size_t bytesTransferred) {
+ handleSend(err, cmd);
+ }));
}
void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args)
{
@@ -1116,21 +1087,18 @@ void ClientConnection::sendMessage(const
std::shared_ptr<SendArguments>& args) {
// Capture the buffer because asio does not copy the buffer, if the
buffer is destroyed before the
// callback is called, an invalid buffer range might be passed to the
underlying socket send.
asyncWrite(buffer, customAllocWriteHandler(
- [this, self, buffer](const
boost::system::error_code& err,
- size_t bytesTransferred) {
handleSendPair(err); }));
+ [this, self, buffer](const ASIO_ERROR& err,
size_t bytesTransferred) {
+ handleSendPair(err);
+ }));
};
if (tlsSocket_) {
-#if BOOST_VERSION >= 106600
- boost::asio::post(strand_, sendMessageInternal);
-#else
- strand_.post(sendMessageInternal);
-#endif
+ ASIO::post(strand_, sendMessageInternal);
} else {
sendMessageInternal();
}
}
-void ClientConnection::handleSend(const boost::system::error_code& err, const
SharedBuffer&) {
+void ClientConnection::handleSend(const ASIO_ERROR& err, const SharedBuffer&) {
if (isClosed()) {
return;
}
@@ -1142,7 +1110,7 @@ void ClientConnection::handleSend(const
boost::system::error_code& err, const Sh
}
}
-void ClientConnection::handleSendPair(const boost::system::error_code& err) {
+void ClientConnection::handleSendPair(const ASIO_ERROR& err) {
if (isClosed()) {
return;
}
@@ -1166,8 +1134,8 @@ void ClientConnection::sendPendingCommands() {
if (any.type() == typeid(SharedBuffer)) {
SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
asyncWrite(buffer.const_asio_buffer(),
- customAllocWriteHandler([this, self, buffer](const
boost::system::error_code& err,
- size_t) {
handleSend(err, buffer); }));
+ customAllocWriteHandler(
+ [this, self, buffer](const ASIO_ERROR& err, size_t)
{ handleSend(err, buffer); }));
} else {
assert(any.type() == typeid(std::shared_ptr<SendArguments>));
@@ -1178,9 +1146,9 @@ void ClientConnection::sendPendingCommands() {
// Capture the buffer because asio does not copy the buffer, if
the buffer is destroyed before the
// callback is called, an invalid buffer range might be passed to
the underlying socket send.
- asyncWrite(buffer,
- customAllocWriteHandler([this, self, buffer](const
boost::system::error_code& err,
- size_t) {
handleSendPair(err); }));
+ asyncWrite(buffer, customAllocWriteHandler([this, self,
buffer](const ASIO_ERROR& err, size_t) {
+ handleSendPair(err);
+ }));
}
} else {
// No more pending writes
@@ -1202,7 +1170,7 @@ Future<Result, ResponseData>
ClientConnection::sendRequestWithId(SharedBuffer cm
requestData.timer = executor_->createDeadlineTimer();
requestData.timer->expires_from_now(operationsTimeout_);
auto weakSelf = weak_from_this();
- requestData.timer->async_wait([weakSelf, requestData](const
boost::system::error_code& ec) {
+ requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR&
ec) {
auto self = weakSelf.lock();
if (self) {
self->handleRequestTimeout(ec, requestData);
@@ -1216,21 +1184,19 @@ Future<Result, ResponseData>
ClientConnection::sendRequestWithId(SharedBuffer cm
return requestData.promise.getFuture();
}
-void ClientConnection::handleRequestTimeout(const boost::system::error_code&
ec,
- PendingRequestData
pendingRequestData) {
+void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec,
PendingRequestData pendingRequestData) {
if (!ec && !pendingRequestData.hasGotResponse->load()) {
pendingRequestData.promise.setFailed(ResultTimeout);
}
}
-void ClientConnection::handleLookupTimeout(const boost::system::error_code& ec,
- LookupRequestData
pendingRequestData) {
+void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec,
LookupRequestData pendingRequestData) {
if (!ec) {
pendingRequestData.promise->setFailed(ResultTimeout);
}
}
-void ClientConnection::handleGetLastMessageIdTimeout(const
boost::system::error_code& ec,
+void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec,
ClientConnection::LastMessageIdRequestData data) {
if (!ec) {
data.promise->setFailed(ResultTimeout);
@@ -1255,9 +1221,9 @@ void ClientConnection::handleKeepAliveTimeout() {
// be zero And we do not attempt to dereference the pointer.
Lock lock(mutex_);
if (keepAliveTimer_) {
-
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+
keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds));
auto weakSelf = weak_from_this();
- keepAliveTimer_->async_wait([weakSelf](const
boost::system::error_code&) {
+ keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
auto self = weakSelf.lock();
if (self) {
self->handleKeepAliveTimeout();
@@ -1268,7 +1234,7 @@ void ClientConnection::handleKeepAliveTimeout() {
}
}
-void ClientConnection::handleConsumerStatsTimeout(const
boost::system::error_code& ec,
+void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec,
std::vector<uint64_t>
consumerStatsRequests) {
if (ec) {
LOG_DEBUG(cnxString_ << " Ignoring timer cancelled event, code[" << ec
<< "]");
@@ -1285,15 +1251,15 @@ void ClientConnection::close(Result result, bool
detach) {
state_ = Disconnected;
if (socket_) {
- boost::system::error_code err;
- socket_->shutdown(boost::asio::socket_base::shutdown_both, err);
+ ASIO_ERROR err;
+ socket_->shutdown(ASIO::socket_base::shutdown_both, err);
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " <<
err.message());
}
}
if (tlsSocket_) {
- boost::system::error_code err;
+ ASIO_ERROR err;
tlsSocket_->lowest_layer().close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close TLS socket: " <<
err.message());
@@ -1432,7 +1398,7 @@ Future<Result, GetLastMessageIdResponse>
ClientConnection::newGetLastMessageId(u
requestData.timer = executor_->createDeadlineTimer();
requestData.timer->expires_from_now(operationsTimeout_);
auto weakSelf = weak_from_this();
- requestData.timer->async_wait([weakSelf, requestData](const
boost::system::error_code& ec) {
+ requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR&
ec) {
auto self = weakSelf.lock();
if (self) {
self->handleGetLastMessageIdTimeout(ec, requestData);
@@ -1823,7 +1789,7 @@ void ClientConnection::handleAuthChallenge() {
}
auto self = shared_from_this();
asyncWrite(buffer.const_asio_buffer(),
- customAllocWriteHandler([this, self, buffer](const
boost::system::error_code& err, size_t) {
+ customAllocWriteHandler([this, self, buffer](const ASIO_ERROR&
err, size_t) {
handleSentAuthResponse(err, buffer);
}));
}
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 1bc1bd8..69155fd 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -23,13 +23,20 @@
#include <pulsar/defines.h>
#include <atomic>
-#include <boost/any.hpp>
+#ifdef USE_ASIO
+#include <asio/bind_executor.hpp>
+#include <asio/io_service.hpp>
+#include <asio/ip/tcp.hpp>
+#include <asio/ssl/stream.hpp>
+#include <asio/strand.hpp>
+#else
#include <boost/asio/bind_executor.hpp>
-#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp>
+#endif
+#include <boost/any.hpp>
#include <boost/optional.hpp>
#include <deque>
#include <functional>
@@ -37,19 +44,18 @@
#include <string>
#include <vector>
+#include "AsioTimer.h"
#include "Commands.h"
#include "GetLastMessageIdResponse.h"
#include "LookupDataResult.h"
#include "SharedBuffer.h"
+#include "TimeUtils.h"
#include "UtilAllocator.h"
-
namespace pulsar {
class PulsarFriend;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
-using TimeDuration = boost::posix_time::time_duration;
-using TcpResolverPtr = std::shared_ptr<boost::asio::ip::tcp::resolver>;
+using TcpResolverPtr = std::shared_ptr<ASIO::ip::tcp::resolver>;
class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
@@ -114,10 +120,10 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
};
public:
- typedef std::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
- typedef
std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&>>
TlsSocketPtr;
+ typedef std::shared_ptr<ASIO::ip::tcp::socket> SocketPtr;
+ typedef std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket&>>
TlsSocketPtr;
typedef std::shared_ptr<ClientConnection> ConnectionPtr;
- typedef std::function<void(const boost::system::error_code&,
ConnectionPtr)> ConnectionListener;
+ typedef std::function<void(const ASIO_ERROR&, ConnectionPtr)>
ConnectionListener;
typedef std::vector<ConnectionListener>::iterator ListenerIterator;
/*
@@ -224,17 +230,16 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
* although not usable at this point, since this is just tcp connection
* Pulsar - Connect/Connected has yet to happen
*/
- void handleTcpConnected(const boost::system::error_code& err,
- boost::asio::ip::tcp::resolver::iterator
endpointIterator);
+ void handleTcpConnected(const ASIO_ERROR& err,
ASIO::ip::tcp::resolver::iterator endpointIterator);
- void handleHandshake(const boost::system::error_code& err);
+ void handleHandshake(const ASIO_ERROR& err);
- void handleSentPulsarConnect(const boost::system::error_code& err, const
SharedBuffer& buffer);
- void handleSentAuthResponse(const boost::system::error_code& err, const
SharedBuffer& buffer);
+ void handleSentPulsarConnect(const ASIO_ERROR& err, const SharedBuffer&
buffer);
+ void handleSentAuthResponse(const ASIO_ERROR& err, const SharedBuffer&
buffer);
void readNextCommand();
- void handleRead(const boost::system::error_code& err, size_t
bytesTransferred, uint32_t minReadSize);
+ void handleRead(const ASIO_ERROR& err, size_t bytesTransferred, uint32_t
minReadSize);
void processIncomingBuffer();
bool verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t&
remainingBytes,
@@ -248,19 +253,18 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
void handlePulsarConnected(const proto::CommandConnected& cmdConnected);
- void handleResolve(const boost::system::error_code& err,
- boost::asio::ip::tcp::resolver::iterator
endpointIterator);
+ void handleResolve(const ASIO_ERROR& err,
ASIO::ip::tcp::resolver::iterator endpointIterator);
- void handleSend(const boost::system::error_code& err, const SharedBuffer&
cmd);
- void handleSendPair(const boost::system::error_code& err);
+ void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
+ void handleSendPair(const ASIO_ERROR& err);
void sendPendingCommands();
void newLookup(const SharedBuffer& cmd, const uint64_t requestId,
LookupDataResultPromisePtr promise);
- void handleRequestTimeout(const boost::system::error_code& ec,
PendingRequestData pendingRequestData);
+ void handleRequestTimeout(const ASIO_ERROR& ec, PendingRequestData
pendingRequestData);
- void handleLookupTimeout(const boost::system::error_code&,
LookupRequestData);
+ void handleLookupTimeout(const ASIO_ERROR&, LookupRequestData);
- void handleGetLastMessageIdTimeout(const boost::system::error_code&,
LastMessageIdRequestData data);
+ void handleGetLastMessageIdTimeout(const ASIO_ERROR&,
LastMessageIdRequestData data);
void handleKeepAliveTimeout();
@@ -280,13 +284,9 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
return;
}
if (tlsSocket_) {
-#if BOOST_VERSION >= 106600
- boost::asio::async_write(*tlsSocket_, buffers,
boost::asio::bind_executor(strand_, handler));
-#else
- boost::asio::async_write(*tlsSocket_, buffers,
strand_.wrap(handler));
-#endif
+ ASIO::async_write(*tlsSocket_, buffers,
ASIO::bind_executor(strand_, handler));
} else {
- boost::asio::async_write(*socket_, buffers, handler);
+ ASIO::async_write(*socket_, buffers, handler);
}
}
@@ -296,11 +296,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
return;
}
if (tlsSocket_) {
-#if BOOST_VERSION >= 106600
- tlsSocket_->async_read_some(buffers,
boost::asio::bind_executor(strand_, handler));
-#else
- tlsSocket_->async_read_some(buffers, strand_.wrap(handler));
-#endif
+ tlsSocket_->async_read_some(buffers, ASIO::bind_executor(strand_,
handler));
} else {
socket_->async_receive(buffers, handler);
}
@@ -321,11 +317,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
*/
SocketPtr socket_;
TlsSocketPtr tlsSocket_;
-#if BOOST_VERSION >= 106600
- boost::asio::strand<boost::asio::io_service::executor_type> strand_;
-#else
- boost::asio::io_service::strand strand_;
-#endif
+ ASIO::strand<ASIO::io_service::executor_type> strand_;
const std::string logicalAddress_;
/*
@@ -343,7 +335,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
/*
* indicates if async connection establishment failed
*/
- boost::system::error_code error_;
+ ASIO_ERROR error_;
SharedBuffer incomingBuffer_;
@@ -392,8 +384,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
DeadlineTimerPtr keepAliveTimer_;
DeadlineTimerPtr consumerStatsRequestTimer_;
- void handleConsumerStatsTimeout(const boost::system::error_code& ec,
- std::vector<uint64_t>
consumerStatsRequests);
+ void handleConsumerStatsTimeout(const ASIO_ERROR& ec,
std::vector<uint64_t> consumerStatsRequests);
void startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests);
uint32_t maxPendingLookupRequest_;
diff --git a/lib/CompressionCodec.cc b/lib/CompressionCodec.cc
index 991d52c..6105887 100644
--- a/lib/CompressionCodec.cc
+++ b/lib/CompressionCodec.cc
@@ -45,7 +45,6 @@ CompressionCodec&
CompressionCodecProvider::getCodec(CompressionType compression
default:
return compressionCodecNone_;
}
- BOOST_THROW_EXCEPTION(std::logic_error("Invalid CompressionType
enumeration value"));
}
SharedBuffer CompressionCodecNone::encode(const SharedBuffer& raw) { return
raw; }
diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc
index 95170a9..4cc8883 100644
--- a/lib/ConnectionPool.cc
+++ b/lib/ConnectionPool.cc
@@ -18,15 +18,20 @@
*/
#include "ConnectionPool.h"
+#ifdef USE_ASIO
+#include <asio/ip/tcp.hpp>
+#include <asio/ssl.hpp>
+#else
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
+#endif
#include "ClientConnection.h"
#include "ExecutorService.h"
#include "LogUtils.h"
-using boost::asio::ip::tcp;
-namespace ssl = boost::asio::ssl;
+using ASIO::ip::tcp;
+namespace ssl = ASIO::ssl;
typedef ssl::stream<tcp::socket> ssl_socket;
DECLARE_LOG_OBJECT()
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index f9770d8..5216218 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -26,6 +26,7 @@
#include "AckGroupingTracker.h"
#include "AckGroupingTrackerDisabled.h"
#include "AckGroupingTrackerEnabled.h"
+#include "AsioDefines.h"
#include "BatchMessageAcker.h"
#include "BatchedMessageIdImpl.h"
#include "BitSet.h"
@@ -55,6 +56,9 @@ namespace pulsar {
DECLARE_LOG_OBJECT()
+using std::chrono::milliseconds;
+using std::chrono::seconds;
+
ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string&
topic,
const std::string& subscriptionName, const
ConsumerConfiguration& conf,
bool isPersistent, const ConsumerInterceptorsPtr&
interceptors,
@@ -402,10 +406,9 @@ void ConsumerImpl::discardChunkMessages(std::string uuid,
MessageId messageId, b
}
void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
- checkExpiredChunkedTimer_->expires_from_now(
-
boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+
checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
- checkExpiredChunkedTimer_->async_wait([this, weakSelf](const
boost::system::error_code& ec) -> void {
+ checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR&
ec) -> void {
auto self = weakSelf.lock();
if (!self) {
return;
@@ -1581,7 +1584,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const
BackoffPtr& backoff, Time
}
} else {
TimeDuration next = std::min(remainTime, backoff->next());
- if (next.total_milliseconds() <= 0) {
+ if (toMillis(next) <= 0) {
LOG_ERROR(getName() << " Client Connection not ready for
Consumer");
callback(ResultNotConnected, MessageId());
return;
@@ -1592,8 +1595,8 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const
BackoffPtr& backoff, Time
auto self = shared_from_this();
timer->async_wait([this, backoff, remainTime, timer, next, callback,
- self](const boost::system::error_code& ec) -> void {
- if (ec == boost::asio::error::operation_aborted) {
+ self](const ASIO_ERROR& ec) -> void {
+ if (ec == ASIO::error::operation_aborted) {
LOG_DEBUG(getName() << " Get last message id operation was
cancelled, code[" << ec << "].");
return;
}
@@ -1602,7 +1605,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const
BackoffPtr& backoff, Time
return;
}
LOG_WARN(getName() << " Could not get connection while
getLastMessageId -- Will try again in "
- << next.total_milliseconds() << " ms")
+ << toMillis(next) << " ms")
this->internalGetLastMessageIdAsync(backoff, remainTime, timer,
callback);
});
}
@@ -1693,7 +1696,7 @@ std::shared_ptr<ConsumerImpl>
ConsumerImpl::get_shared_this_ptr() {
}
void ConsumerImpl::cancelTimers() noexcept {
- boost::system::error_code ec;
+ ASIO_ERROR ec;
batchReceiveTimer_->cancel(ec);
checkExpiredChunkedTimer_->cancel(ec);
unAckedMessageTrackerPtr_->stop();
diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc
index 3921237..851d41e 100644
--- a/lib/ConsumerImplBase.cc
+++ b/lib/ConsumerImplBase.cc
@@ -51,9 +51,9 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client,
const std::string& topi
void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
if (timeoutMs > 0) {
-
batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+
batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs));
std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
- batchReceiveTimer_->async_wait([weakSelf](const
boost::system::error_code& ec) {
+ batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (self && !ec) {
self->doBatchReceiveTimeTask();
diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc
index 53be8ea..794e361 100644
--- a/lib/ExecutorService.cc
+++ b/lib/ExecutorService.cc
@@ -32,7 +32,7 @@ void ExecutorService::start() {
auto self = shared_from_this();
std::thread t{[this, self] {
LOG_DEBUG("Run io_service in a single thread");
- boost::system::error_code ec;
+ ASIO_ERROR ec;
while (!closed_) {
io_service_.restart();
IOService::work work{getIOService()};
@@ -63,22 +63,22 @@ ExecutorServicePtr ExecutorService::create() {
}
/*
- * factory method of boost::asio::ip::tcp::socket associated with io_service_
instance
+ * factory method of ASIO::ip::tcp::socket associated with io_service_
instance
* @ returns shared_ptr to this socket
*/
SocketPtr ExecutorService::createSocket() {
try {
- return SocketPtr(new boost::asio::ip::tcp::socket(io_service_));
- } catch (const boost::system::system_error &e) {
+ return SocketPtr(new ASIO::ip::tcp::socket(io_service_));
+ } catch (const ASIO_SYSTEM_ERROR &e) {
restart();
auto error = std::string("Failed to create socket: ") + e.what();
throw std::runtime_error(error);
}
}
-TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket,
boost::asio::ssl::context &ctx) {
- return
std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket &>>(
- new boost::asio::ssl::stream<boost::asio::ip::tcp::socket &>(*socket,
ctx));
+TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket,
ASIO::ssl::context &ctx) {
+ return std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket &>>(
+ new ASIO::ssl::stream<ASIO::ip::tcp::socket &>(*socket, ctx));
}
/*
@@ -87,8 +87,8 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr
&socket, boost::asio::ss
*/
TcpResolverPtr ExecutorService::createTcpResolver() {
try {
- return TcpResolverPtr(new boost::asio::ip::tcp::resolver(io_service_));
- } catch (const boost::system::system_error &e) {
+ return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_));
+ } catch (const ASIO_SYSTEM_ERROR &e) {
restart();
auto error = std::string("Failed to create resolver: ") + e.what();
throw std::runtime_error(error);
@@ -97,10 +97,10 @@ TcpResolverPtr ExecutorService::createTcpResolver() {
DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
try {
- return DeadlineTimerPtr(new boost::asio::deadline_timer(io_service_));
- } catch (const boost::system::system_error &e) {
+ return DeadlineTimerPtr(new ASIO::steady_timer(io_service_));
+ } catch (const ASIO_SYSTEM_ERROR &e) {
restart();
- auto error = std::string("Failed to create deadline_timer: ") +
e.what();
+ auto error = std::string("Failed to create steady_timer: ") + e.what();
throw std::runtime_error(error);
}
}
diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h
index a373c0a..89d06d3 100644
--- a/lib/ExecutorService.h
+++ b/lib/ExecutorService.h
@@ -22,10 +22,15 @@
#include <pulsar/defines.h>
#include <atomic>
-#include <boost/asio/deadline_timer.hpp>
+#ifdef USE_ASIO
+#include <asio/io_service.hpp>
+#include <asio/ip/tcp.hpp>
+#include <asio/ssl.hpp>
+#else
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
+#endif
#include <chrono>
#include <condition_variable>
#include <functional>
@@ -33,14 +38,15 @@
#include <mutex>
#include <thread>
+#include "AsioTimer.h"
+
namespace pulsar {
-typedef std::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
-typedef std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket
&> > TlsSocketPtr;
-typedef std::shared_ptr<boost::asio::ip::tcp::resolver> TcpResolverPtr;
-typedef std::shared_ptr<boost::asio::deadline_timer> DeadlineTimerPtr;
+typedef std::shared_ptr<ASIO::ip::tcp::socket> SocketPtr;
+typedef std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket &> >
TlsSocketPtr;
+typedef std::shared_ptr<ASIO::ip::tcp::resolver> TcpResolverPtr;
class PULSAR_PUBLIC ExecutorService : public
std::enable_shared_from_this<ExecutorService> {
public:
- using IOService = boost::asio::io_service;
+ using IOService = ASIO::io_service;
using SharedPtr = std::shared_ptr<ExecutorService>;
static SharedPtr create();
@@ -51,7 +57,7 @@ class PULSAR_PUBLIC ExecutorService : public
std::enable_shared_from_this<Execut
// throws std::runtime_error if failed
SocketPtr createSocket();
- static TlsSocketPtr createTlsSocket(SocketPtr &socket,
boost::asio::ssl::context &ctx);
+ static TlsSocketPtr createTlsSocket(SocketPtr &socket, ASIO::ssl::context
&ctx);
// throws std::runtime_error if failed
TcpResolverPtr createTcpResolver();
// throws std::runtime_error if failed
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index b55f751..fa21a57 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -18,6 +18,7 @@
*/
#include "HandlerBase.h"
+#include "Backoff.h"
#include "ClientConnection.h"
#include "ClientImpl.h"
#include "ExecutorService.h"
@@ -36,7 +37,7 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const
std::string& topic,
executor_(client->getIOExecutorProvider()->get()),
mutex_(),
creationTimestamp_(TimeUtils::now()),
- operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())),
+
operationTimeut_(std::chrono::seconds(client->conf().getOperationTimeoutSeconds())),
state_(NotStarted),
backoff_(backoff),
epoch_(0),
@@ -147,13 +148,13 @@ void HandlerBase::scheduleReconnection() {
if (state == Pending || state == Ready) {
TimeDuration delay = backoff_.next();
- LOG_INFO(getName() << "Schedule reconnection in " <<
(delay.total_milliseconds() / 1000.0) << " s");
+ LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay)
/ 1000.0) << " s");
timer_->expires_from_now(delay);
// passing shared_ptr here since time_ will get destroyed, so tasks
will be cancelled
// so we will not run into the case where grabCnx is invoked on out of
scope handler
auto name = getName();
std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
- timer_->async_wait([name, weakSelf](const boost::system::error_code&
ec) {
+ timer_->async_wait([name, weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (self) {
self->handleTimeout(ec);
@@ -164,7 +165,7 @@ void HandlerBase::scheduleReconnection() {
}
}
-void HandlerBase::handleTimeout(const boost::system::error_code& ec) {
+void HandlerBase::handleTimeout(const ASIO_ERROR& ec) {
if (ec) {
LOG_DEBUG(getName() << "Ignoring timer cancelled event, code[" << ec
<< "]");
return;
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index f62c4df..68c0b6a 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -20,20 +20,17 @@
#define _PULSAR_HANDLER_BASE_HEADER_
#include <pulsar/Result.h>
-#include <boost/asio/deadline_timer.hpp>
#include <memory>
#include <mutex>
#include <string>
+#include "AsioTimer.h"
#include "Backoff.h"
#include "Future.h"
+#include "TimeUtils.h"
namespace pulsar {
-using namespace boost::posix_time;
-using boost::posix_time::milliseconds;
-using boost::posix_time::seconds;
-
class ClientImpl;
using ClientImplPtr = std::shared_ptr<ClientImpl>;
using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
@@ -42,7 +39,6 @@ using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
public:
@@ -95,7 +91,7 @@ class HandlerBase : public
std::enable_shared_from_this<HandlerBase> {
void handleDisconnection(Result result, const ClientConnectionPtr& cnx);
- void handleTimeout(const boost::system::error_code& ec);
+ void handleTimeout(const ASIO_ERROR& ec);
protected:
ClientImplWeakPtr client_;
diff --git a/lib/Int64SerDes.h b/lib/Int64SerDes.h
index dbc5d8a..f1f5eef 100644
--- a/lib/Int64SerDes.h
+++ b/lib/Int64SerDes.h
@@ -20,7 +20,12 @@
#include <stdint.h>
-#include <boost/asio.hpp> // for ntohl
+// for ntohl
+#ifdef USE_ASIO
+#include <asio/detail/socket_ops.hpp>
+#else
+#include <boost/asio/detail/socket_ops.hpp>
+#endif
namespace pulsar {
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 15f9d9b..af70623 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -18,6 +18,7 @@
*/
#include "MultiTopicsConsumerImpl.h"
+#include <chrono>
#include <stdexcept>
#include "ClientImpl.h"
@@ -37,6 +38,9 @@ DECLARE_LOG_OBJECT()
using namespace pulsar;
+using std::chrono::milliseconds;
+using std::chrono::seconds;
+
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client,
TopicNamePtr topicName,
int numPartitions, const
std::string& subscriptionName,
const ConsumerConfiguration&
conf,
@@ -90,7 +94,7 @@
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
auto partitionsUpdateInterval = static_cast<unsigned
int>(client->conf().getPartitionsUpdateInterval());
if (partitionsUpdateInterval > 0) {
partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
- partitionsUpdateInterval_ =
boost::posix_time::seconds(partitionsUpdateInterval);
+ partitionsUpdateInterval_ = seconds(partitionsUpdateInterval);
lookupServicePtr_ = client->getLookup();
}
@@ -936,7 +940,7 @@ uint64_t
MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
auto weakSelf = weak_from_this();
- partitionsUpdateTimer_->async_wait([weakSelf](const
boost::system::error_code& ec) {
+ partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
// If two requests call runPartitionUpdateTask at the same time, the
timer will fail, and it
// cannot continue at this time, and the request needs to be ignored.
auto self = weakSelf.lock();
@@ -1087,7 +1091,7 @@ void
MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) {
void MultiTopicsConsumerImpl::cancelTimers() noexcept {
if (partitionsUpdateTimer_) {
- boost::system::error_code ec;
+ ASIO_ERROR ec;
partitionsUpdateTimer_->cancel(ec);
}
}
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index d4127f6..c5834ea 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -32,6 +32,7 @@
#include "LookupDataResult.h"
#include "SynchronizedHashMap.h"
#include "TestUtil.h"
+#include "TimeUtils.h"
#include "UnboundedBlockingQueue.h"
namespace pulsar {
@@ -119,7 +120,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
std::atomic_int incomingMessagesSize_ = {0};
MessageListener messageListener_;
DeadlineTimerPtr partitionsUpdateTimer_;
- boost::posix_time::time_duration partitionsUpdateInterval_;
+ TimeDuration partitionsUpdateInterval_;
LookupServicePtr lookupServicePtr_;
std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
std::atomic<Result> failedResult{ResultOk};
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index 0dd7358..e443496 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -40,9 +40,9 @@ NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr
client, ConsumerImpl &con
nackDelay_ =
std::chrono::milliseconds(std::max(conf.getNegativeAckRedeliveryDelayMs(),
MIN_NACK_DELAY_MILLIS));
- timerInterval_ = boost::posix_time::milliseconds((long)(nackDelay_.count()
/ 3));
- LOG_DEBUG("Created negative ack tracker with delay: " << nackDelay_.count()
- << " ms - Timer
interval: " << timerInterval_);
+ timerInterval_ = std::chrono::milliseconds((long)(nackDelay_.count() / 3));
+ LOG_DEBUG("Created negative ack tracker with delay: " <<
nackDelay_.count() << " ms - Timer interval: "
+ <<
timerInterval_.count());
}
void NegativeAcksTracker::scheduleTimer() {
@@ -51,14 +51,14 @@ void NegativeAcksTracker::scheduleTimer() {
}
std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()};
timer_->expires_from_now(timerInterval_);
- timer_->async_wait([weakSelf](const boost::system::error_code &ec) {
+ timer_->async_wait([weakSelf](const ASIO_ERROR &ec) {
if (auto self = weakSelf.lock()) {
self->handleTimer(ec);
}
});
}
-void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) {
+void NegativeAcksTracker::handleTimer(const ASIO_ERROR &ec) {
if (ec) {
// Ignore cancelled events
return;
@@ -107,7 +107,7 @@ void NegativeAcksTracker::add(const MessageId &m) {
void NegativeAcksTracker::close() {
closed_ = true;
- boost::system::error_code ec;
+ ASIO_ERROR ec;
timer_->cancel(ec);
std::lock_guard<std::mutex> lock(mutex_);
nackedMessages_.clear();
diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h
index 4b48984..472e976 100644
--- a/lib/NegativeAcksTracker.h
+++ b/lib/NegativeAcksTracker.h
@@ -23,12 +23,13 @@
#include <pulsar/MessageId.h>
#include <atomic>
-#include <boost/asio/deadline_timer.hpp>
#include <chrono>
#include <map>
#include <memory>
#include <mutex>
+#include "AsioDefines.h"
+#include "AsioTimer.h"
#include "TestUtil.h"
namespace pulsar {
@@ -36,7 +37,6 @@ namespace pulsar {
class ConsumerImpl;
class ClientImpl;
using ClientImplPtr = std::shared_ptr<ClientImpl>;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
@@ -56,13 +56,13 @@ class NegativeAcksTracker : public
std::enable_shared_from_this<NegativeAcksTrac
private:
void scheduleTimer();
- void handleTimer(const boost::system::error_code &ec);
+ void handleTimer(const ASIO_ERROR &ec);
ConsumerImpl &consumer_;
std::mutex mutex_;
std::chrono::milliseconds nackDelay_;
- boost::posix_time::milliseconds timerInterval_;
+ std::chrono::milliseconds timerInterval_;
typedef typename std::chrono::steady_clock Clock;
std::map<MessageId, Clock::time_point> nackedMessages_;
diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h
index a1319e1..46fd9c1 100644
--- a/lib/OpSendMsg.h
+++ b/lib/OpSendMsg.h
@@ -23,8 +23,6 @@
#include <pulsar/Producer.h>
#include <pulsar/Result.h>
-#include <boost/date_time/posix_time/ptime.hpp>
-
#include "ChunkMessageIdImpl.h"
#include "PulsarApi.pb.h"
#include "SharedBuffer.h"
@@ -53,7 +51,7 @@ struct OpSendMsg {
const int32_t numChunks;
const uint32_t messagesCount;
const uint64_t messagesSize;
- const boost::posix_time::ptime timeout;
+ const ptime timeout;
const SendCallback sendCallback;
std::vector<std::function<void(Result)>> trackerCallbacks;
ChunkMessageIdListPtr chunkMessageIdList;
@@ -98,7 +96,7 @@ struct OpSendMsg {
numChunks(metadata.num_chunks_from_msg()),
messagesCount(messagesCount),
messagesSize(messagesSize),
- timeout(TimeUtils::now() +
boost::posix_time::milliseconds(sendTimeoutMs)),
+ timeout(TimeUtils::now() + std::chrono::milliseconds(sendTimeoutMs)),
sendCallback(std::move(callback)),
chunkMessageIdList(std::move(chunkMessageIdList)),
sendArgs(new SendArguments(producerId, metadata.sequence_id(),
metadata, payload)) {}
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 54e96c8..4178096 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -58,7 +58,7 @@
PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top
if (partitionsUpdateInterval > 0) {
listenerExecutor_ = client->getListenerExecutorProvider()->get();
partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
- partitionsUpdateInterval_ =
boost::posix_time::seconds(partitionsUpdateInterval);
+ partitionsUpdateInterval_ =
std::chrono::seconds(partitionsUpdateInterval);
lookupServicePtr_ = client->getLookup();
}
}
@@ -69,7 +69,7 @@ MessageRoutingPolicyPtr
PartitionedProducerImpl::getMessageRouter() {
return std::make_shared<RoundRobinMessageRouter>(
conf_.getHashingScheme(), conf_.getBatchingEnabled(),
conf_.getBatchingMaxMessages(),
conf_.getBatchingMaxAllowedSizeInBytes(),
-
boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
+
std::chrono::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
case ProducerConfiguration::CustomPartition:
return conf_.getMessageRouterPtr();
case ProducerConfiguration::UseSinglePartition:
@@ -422,7 +422,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback
callback) {
void PartitionedProducerImpl::runPartitionUpdateTask() {
auto weakSelf = weak_from_this();
partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
- partitionsUpdateTimer_->async_wait([weakSelf](const
boost::system::error_code& ec) {
+ partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (self) {
self->getPartitionMetadata();
@@ -524,7 +524,7 @@ uint64_t
PartitionedProducerImpl::getNumberOfConnectedProducer() {
void PartitionedProducerImpl::cancelTimers() noexcept {
if (partitionsUpdateTimer_) {
- boost::system::error_code ec;
+ ASIO_ERROR ec;
partitionsUpdateTimer_->cancel(ec);
}
}
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index 2d07a81..610c74e 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -20,21 +20,21 @@
#include <pulsar/TopicMetadata.h>
#include <atomic>
-#include <boost/asio/deadline_timer.hpp>
#include <memory>
#include <mutex>
#include <vector>
+#include "AsioTimer.h"
#include "LookupDataResult.h"
#include "ProducerImplBase.h"
#include "ProducerInterceptors.h"
+#include "TimeUtils.h"
namespace pulsar {
class ClientImpl;
using ClientImplPtr = std::shared_ptr<ClientImpl>;
using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
class LookupService;
@@ -128,7 +128,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
ExecutorServicePtr listenerExecutor_;
DeadlineTimerPtr partitionsUpdateTimer_;
- boost::posix_time::time_duration partitionsUpdateInterval_;
+ TimeDuration partitionsUpdateInterval_;
LookupServicePtr lookupServicePtr_;
ProducerInterceptorsPtr interceptors_;
diff --git a/lib/PatternMultiTopicsConsumerImpl.cc
b/lib/PatternMultiTopicsConsumerImpl.cc
index 23e445e..4fc7bb6 100644
--- a/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/lib/PatternMultiTopicsConsumerImpl.cc
@@ -27,6 +27,8 @@ DECLARE_LOG_OBJECT()
using namespace pulsar;
+using std::chrono::seconds;
+
PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(
ClientImplPtr client, const std::string pattern,
CommandGetTopicsOfNamespace_Mode getTopicsMode,
const std::vector<std::string>& topics, const std::string&
subscriptionName,
@@ -49,15 +51,15 @@ void
PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() {
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
auto weakSelf = weak_from_this();
- autoDiscoveryTimer_->async_wait([weakSelf](const
boost::system::error_code& err) {
+ autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
if (auto self = weakSelf.lock()) {
self->autoDiscoveryTimerTask(err);
}
});
}
-void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const
boost::system::error_code& err) {
- if (err == boost::asio::error::operation_aborted) {
+void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const ASIO_ERROR&
err) {
+ if (err == ASIO::error::operation_aborted) {
LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
return;
} else if (err) {
@@ -228,7 +230,7 @@ void PatternMultiTopicsConsumerImpl::start() {
if (conf_.getPatternAutoDiscoveryPeriod() > 0) {
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
auto weakSelf = weak_from_this();
- autoDiscoveryTimer_->async_wait([weakSelf](const
boost::system::error_code& err) {
+ autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
if (auto self = weakSelf.lock()) {
self->autoDiscoveryTimerTask(err);
}
@@ -247,6 +249,6 @@ void
PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
}
void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept {
- boost::system::error_code ec;
+ ASIO_ERROR ec;
autoDiscoveryTimer_->cancel(ec);
}
diff --git a/lib/PatternMultiTopicsConsumerImpl.h
b/lib/PatternMultiTopicsConsumerImpl.h
index 5d3ba9e..f272df2 100644
--- a/lib/PatternMultiTopicsConsumerImpl.h
+++ b/lib/PatternMultiTopicsConsumerImpl.h
@@ -22,6 +22,7 @@
#include <string>
#include <vector>
+#include "AsioTimer.h"
#include "LookupDataResult.h"
#include "MultiTopicsConsumerImpl.h"
#include "NamespaceName.h"
@@ -56,7 +57,7 @@ class PatternMultiTopicsConsumerImpl : public
MultiTopicsConsumerImpl {
const PULSAR_REGEX_NAMESPACE::regex getPattern();
- void autoDiscoveryTimerTask(const boost::system::error_code& err);
+ void autoDiscoveryTimerTask(const ASIO_ERROR& err);
// filter input `topics` with given `pattern`, return matched topics. Do
not match topic domain.
static NamespaceTopicsPtr topicsPatternFilter(const
std::vector<std::string>& topics,
@@ -74,7 +75,7 @@ class PatternMultiTopicsConsumerImpl : public
MultiTopicsConsumerImpl {
const std::string patternString_;
const PULSAR_REGEX_NAMESPACE::regex pattern_;
const CommandGetTopicsOfNamespace_Mode getTopicsMode_;
- typedef std::shared_ptr<boost::asio::deadline_timer> TimerPtr;
+ typedef std::shared_ptr<ASIO::steady_timer> TimerPtr;
TimerPtr autoDiscoveryTimer_;
bool autoDiscoveryRunning_;
NamespaceNamePtr namespaceName_;
diff --git a/lib/PeriodicTask.cc b/lib/PeriodicTask.cc
index 6046eae..9fde012 100644
--- a/lib/PeriodicTask.cc
+++ b/lib/PeriodicTask.cc
@@ -18,6 +18,8 @@
*/
#include "PeriodicTask.h"
+#include <chrono>
+
namespace pulsar {
void PeriodicTask::start() {
@@ -27,7 +29,7 @@ void PeriodicTask::start() {
state_ = Ready;
if (periodMs_ >= 0) {
std::weak_ptr<PeriodicTask> weakSelf{shared_from_this()};
- timer_->expires_from_now(boost::posix_time::millisec(periodMs_));
+ timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
timer_->async_wait([weakSelf](const ErrorCode& ec) {
auto self = weakSelf.lock();
if (self) {
@@ -48,7 +50,7 @@ void PeriodicTask::stop() noexcept {
}
void PeriodicTask::handleTimeout(const ErrorCode& ec) {
- if (state_ != Ready || ec.value() ==
boost::system::errc::operation_canceled) {
+ if (state_ != Ready || ec == ASIO::error::operation_aborted) {
return;
}
@@ -57,7 +59,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) {
// state_ may be changed in handleTimeout, so we check state_ again
if (state_ == Ready) {
auto self = shared_from_this();
- timer_->expires_from_now(boost::posix_time::millisec(periodMs_));
+ timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
timer_->async_wait([this, self](const ErrorCode& ec) {
handleTimeout(ec); });
}
}
diff --git a/lib/PeriodicTask.h b/lib/PeriodicTask.h
index 5de81ae..bc18634 100644
--- a/lib/PeriodicTask.h
+++ b/lib/PeriodicTask.h
@@ -36,7 +36,7 @@ namespace pulsar {
*/
class PeriodicTask : public std::enable_shared_from_this<PeriodicTask> {
public:
- using ErrorCode = boost::system::error_code;
+ using ErrorCode = ASIO_ERROR;
using CallbackType = std::function<void(const ErrorCode&)>;
enum State : std::uint8_t
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 0a12925..fc39b23 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -20,7 +20,7 @@
#include <pulsar/MessageIdBuilder.h>
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include <chrono>
#include "BatchMessageContainer.h"
#include "BatchMessageKeyBasedContainer.h"
@@ -46,6 +46,8 @@
namespace pulsar {
DECLARE_LOG_OBJECT()
+using std::chrono::milliseconds;
+
ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
const ProducerConfiguration& conf, const
ProducerInterceptorsPtr& interceptors,
int32_t partition, bool retryOnCreationError)
@@ -465,7 +467,7 @@ void ProducerImpl::sendAsync(const Message& msg,
SendCallback callback) {
Producer producer = Producer(shared_from_this());
auto interceptorMessage = interceptors_->beforeSend(producer, msg);
- const auto now = boost::posix_time::microsec_clock::universal_time();
+ const auto now = TimeUtils::now();
auto self = shared_from_this();
sendAsyncWithStatsUpdate(interceptorMessage, [this, self, now, callback,
producer, interceptorMessage](
Result result, const
MessageId& messageId) {
@@ -564,10 +566,9 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message&
msg, SendCallback&& c
bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
bool isFull = batchMessageContainer_->add(msg, callback);
if (isFirstMessage) {
- batchTimer_->expires_from_now(
-
boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
+
batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
auto weakSelf = weak_from_this();
- batchTimer_->async_wait([this, weakSelf](const
boost::system::error_code& ec) {
+ batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
@@ -824,14 +825,14 @@ Future<Result, ProducerImplBaseWeakPtr>
ProducerImpl::getProducerCreatedFuture()
uint64_t ProducerImpl::getProducerId() const { return producerId_; }
-void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
+void ProducerImpl::handleSendTimeout(const ASIO_ERROR& err) {
const auto state = state_.load();
if (state != Pending && state != Ready) {
return;
}
Lock lock(mutex_);
- if (err == boost::asio::error::operation_aborted) {
+ if (err == ASIO::error::operation_aborted) {
LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
return;
} else if (err) {
@@ -847,8 +848,8 @@ void ProducerImpl::handleSendTimeout(const
boost::system::error_code& err) {
} else {
// If there is at least one message, calculate the diff between the
message timeout and
// the current time.
- time_duration diff = pendingMessagesQueue_.front()->timeout -
TimeUtils::now();
- if (diff.total_milliseconds() <= 0) {
+ auto diff = pendingMessagesQueue_.front()->timeout - TimeUtils::now();
+ if (toMillis(diff) <= 0) {
// The diff is less than or equal to zero, meaning that the
message has been expired.
LOG_DEBUG(getName() << "Timer expired. Calling timeout
callbacks.");
pendingMessages = getPendingCallbacksWhenFailed();
@@ -856,7 +857,7 @@ void ProducerImpl::handleSendTimeout(const
boost::system::error_code& err) {
asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
} else {
// The diff is greater than zero, set the timeout to the diff value
- LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new
timeout " << diff);
+ LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new
timeout " << diff.count());
asyncWaitSendTimeout(diff);
}
}
@@ -1000,7 +1001,7 @@ void ProducerImpl::shutdown() {
void ProducerImpl::cancelTimers() noexcept {
dataKeyRefreshTask_.stop();
- boost::system::error_code ec;
+ ASIO_ERROR ec;
batchTimer_->cancel(ec);
sendTimer_->cancel(ec);
}
@@ -1026,7 +1027,7 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType
expiryTime) {
sendTimer_->expires_from_now(expiryTime);
auto weakSelf = weak_from_this();
- sendTimer_->async_wait([weakSelf](const boost::system::error_code& err) {
+ sendTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
if (self) {
std::static_pointer_cast<ProducerImpl>(self)->handleSendTimeout(err);
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index b467458..9781605 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -19,6 +19,12 @@
#ifndef LIB_PRODUCERIMPL_H_
#define LIB_PRODUCERIMPL_H_
+#include "TimeUtils.h"
+#ifdef USE_ASIO
+#include <asio/steady_timer.hpp>
+#else
+#include <boost/asio/steady_timer.hpp>
+#endif
#include <atomic>
#include <boost/optional.hpp>
#include <list>
@@ -30,6 +36,7 @@
#if defined(_MSC_VER) || defined(__APPLE__)
#include "OpSendMsg.h"
#endif
+#include "AsioDefines.h"
#include "PendingFailures.h"
#include "PeriodicTask.h"
#include "ProducerImplBase.h"
@@ -39,7 +46,7 @@ namespace pulsar {
class BatchMessageContainerBase;
class ClientImpl;
using ClientImplPtr = std::shared_ptr<ClientImpl>;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
+using DeadlineTimerPtr = std::shared_ptr<ASIO::steady_timer>;
class MessageCrypto;
using MessageCryptoPtr = std::shared_ptr<MessageCrypto>;
class ProducerImpl;
@@ -137,7 +144,7 @@ class ProducerImpl : public HandlerBase, public
ProducerImplBase {
void resendMessages(ClientConnectionPtr cnx);
- void refreshEncryptionKey(const boost::system::error_code& ec);
+ void refreshEncryptionKey(const ASIO_ERROR& ec);
bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer&
payload,
SharedBuffer& encryptedPayload);
@@ -183,8 +190,8 @@ class ProducerImpl : public HandlerBase, public
ProducerImplBase {
std::string schemaVersion_;
DeadlineTimerPtr sendTimer_;
- void handleSendTimeout(const boost::system::error_code& err);
- using DurationType = typename boost::asio::deadline_timer::duration_type;
+ void handleSendTimeout(const ASIO_ERROR& err);
+ using DurationType = TimeDuration;
void asyncWaitSendTimeout(DurationType expiryTime);
Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h
index d026e42..9c920da 100644
--- a/lib/RetryableOperation.h
+++ b/lib/RetryableOperation.h
@@ -30,6 +30,7 @@
#include "Future.h"
#include "LogUtils.h"
#include "ResultUtils.h"
+#include "TimeUtils.h"
namespace pulsar {
@@ -43,9 +44,8 @@ class RetryableOperation : public
std::enable_shared_from_this<RetryableOperatio
DeadlineTimerPtr timer)
: name_(name),
func_(std::move(func)),
- timeout_(boost::posix_time::seconds(timeoutSeconds)),
- backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_,
- boost::posix_time::milliseconds(0)),
+ timeout_(std::chrono::seconds(timeoutSeconds)),
+ backoff_(std::chrono::milliseconds(100), timeout_ + timeout_,
std::chrono::milliseconds(0)),
timer_(timer) {}
public:
@@ -67,7 +67,7 @@ class RetryableOperation : public
std::enable_shared_from_this<RetryableOperatio
void cancel() {
promise_.setFailed(ResultDisconnected);
- boost::system::error_code ec;
+ ASIO_ERROR ec;
timer_->cancel(ec);
}
@@ -100,7 +100,7 @@ class RetryableOperation : public
std::enable_shared_from_this<RetryableOperatio
promise_.setFailed(result);
return;
}
- if (remainingTime.total_milliseconds() <= 0) {
+ if (toMillis(remainingTime) <= 0) {
promise_.setFailed(ResultTimeout);
return;
}
@@ -109,24 +109,23 @@ class RetryableOperation : public
std::enable_shared_from_this<RetryableOperatio
timer_->expires_from_now(delay);
auto nextRemainingTime = remainingTime - delay;
- LOG_INFO("Reschedule " << name_ << " for " <<
delay.total_milliseconds()
- << " ms, remaining time: " <<
nextRemainingTime.total_milliseconds()
- << " ms");
- timer_->async_wait([this, weakSelf, nextRemainingTime](const
boost::system::error_code& ec) {
+ LOG_INFO("Reschedule " << name_ << " for " << toMillis(delay)
+ << " ms, remaining time: " <<
toMillis(nextRemainingTime) << " ms");
+ timer_->async_wait([this, weakSelf, nextRemainingTime](const
ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
}
if (ec) {
- if (ec == boost::asio::error::operation_aborted) {
+ if (ec == ASIO::error::operation_aborted) {
LOG_DEBUG("Timer for " << name_ << " is cancelled");
promise_.setFailed(ResultTimeout);
} else {
LOG_WARN("Timer for " << name_ << " failed: " <<
ec.message());
}
} else {
- LOG_DEBUG("Run operation " << name_ << ", remaining time: "
- <<
nextRemainingTime.total_milliseconds() << " ms");
+ LOG_DEBUG("Run operation " << name_ << ", remaining time:
" << toMillis(nextRemainingTime)
+ << " ms");
runImpl(nextRemainingTime);
}
});
diff --git a/lib/RoundRobinMessageRouter.cc b/lib/RoundRobinMessageRouter.cc
index 9693cc2..1bc0b30 100644
--- a/lib/RoundRobinMessageRouter.cc
+++ b/lib/RoundRobinMessageRouter.cc
@@ -26,8 +26,7 @@
namespace pulsar {
RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme
hashingScheme,
bool batchingEnabled,
uint32_t maxBatchingMessages,
- uint32_t maxBatchingSize,
-
boost::posix_time::time_duration maxBatchingDelay)
+ uint32_t maxBatchingSize,
TimeDuration maxBatchingDelay)
: MessageRouterBase(hashingScheme),
batchingEnabled_(batchingEnabled),
maxBatchingMessages_(maxBatchingMessages),
@@ -74,7 +73,7 @@ int RoundRobinMessageRouter::getPartition(const Message& msg,
const TopicMetadat
int64_t now = TimeUtils::currentTimeMillis();
if (messageCount >= maxBatchingMessages_ || (messageSize >=
maxBatchingSize_ - batchSize) ||
- (now - lastPartitionChange >= maxBatchingDelay_.total_milliseconds()))
{
+ (now - lastPartitionChange >= toMillis(maxBatchingDelay_))) {
uint32_t currentPartitionCursor = ++currentPartitionCursor_;
lastPartitionChange_ = now;
cumulativeBatchSize_ = messageSize;
diff --git a/lib/RoundRobinMessageRouter.h b/lib/RoundRobinMessageRouter.h
index 753573a..03cfac8 100644
--- a/lib/RoundRobinMessageRouter.h
+++ b/lib/RoundRobinMessageRouter.h
@@ -23,16 +23,16 @@
#include <pulsar/TopicMetadata.h>
#include <atomic>
-#include <boost/date_time/posix_time/posix_time.hpp>
#include "MessageRouterBase.h"
+#include "TimeUtils.h"
namespace pulsar {
class PULSAR_PUBLIC RoundRobinMessageRouter : public MessageRouterBase {
public:
RoundRobinMessageRouter(ProducerConfiguration::HashingScheme
hashingScheme, bool batchingEnabled,
uint32_t maxBatchingMessages, uint32_t
maxBatchingSize,
- boost::posix_time::time_duration maxBatchingDelay);
+ TimeDuration maxBatchingDelay);
virtual ~RoundRobinMessageRouter();
virtual int getPartition(const Message& msg, const TopicMetadata&
topicMetadata);
@@ -40,7 +40,7 @@ class PULSAR_PUBLIC RoundRobinMessageRouter : public
MessageRouterBase {
const bool batchingEnabled_;
const uint32_t maxBatchingMessages_;
const uint32_t maxBatchingSize_;
- const boost::posix_time::time_duration maxBatchingDelay_;
+ const TimeDuration maxBatchingDelay_;
std::atomic<uint32_t> currentPartitionCursor_;
std::atomic<int64_t> lastPartitionChange_;
diff --git a/lib/SharedBuffer.h b/lib/SharedBuffer.h
index 7ee2618..26fc59e 100644
--- a/lib/SharedBuffer.h
+++ b/lib/SharedBuffer.h
@@ -22,12 +22,19 @@
#include <assert.h>
#include <array>
+#ifdef USE_ASIO
+#include <asio/buffer.hpp>
+#include <asio/detail/socket_ops.hpp>
+#else
#include <boost/asio/buffer.hpp>
#include <boost/asio/detail/socket_ops.hpp>
+#endif
#include <memory>
#include <string>
#include <utility>
+#include "AsioDefines.h"
+
namespace pulsar {
class SharedBuffer {
@@ -144,13 +151,13 @@ class SharedBuffer {
inline bool writable() const { return writableBytes() > 0; }
- boost::asio::const_buffers_1 const_asio_buffer() const {
- return boost::asio::const_buffers_1(ptr_ + readIdx_, readableBytes());
+ ASIO::const_buffers_1 const_asio_buffer() const {
+ return ASIO::const_buffers_1(ptr_ + readIdx_, readableBytes());
}
- boost::asio::mutable_buffers_1 asio_buffer() {
+ ASIO::mutable_buffers_1 asio_buffer() {
assert(data_);
- return boost::asio::buffer(ptr_ + writeIdx_, writableBytes());
+ return ASIO::buffer(ptr_ + writeIdx_, writableBytes());
}
void write(const char* data, uint32_t size) {
@@ -239,17 +246,17 @@ class CompositeSharedBuffer {
}
// Implement the ConstBufferSequence requirements.
- typedef boost::asio::const_buffer value_type;
- typedef boost::asio::const_buffer* iterator;
- typedef const boost::asio::const_buffer* const_iterator;
+ typedef ASIO::const_buffer value_type;
+ typedef ASIO::const_buffer* iterator;
+ typedef const ASIO::const_buffer* const_iterator;
- const boost::asio::const_buffer* begin() const { return
&(asioBuffers_.at(0)); }
+ const ASIO::const_buffer* begin() const { return &(asioBuffers_.at(0)); }
- const boost::asio::const_buffer* end() const { return begin() + Size; }
+ const ASIO::const_buffer* end() const { return begin() + Size; }
private:
std::array<SharedBuffer, Size> sharedBuffers_;
- std::array<boost::asio::const_buffer, Size> asioBuffers_;
+ std::array<ASIO::const_buffer, Size> asioBuffers_;
};
typedef CompositeSharedBuffer<2> PairSharedBuffer;
diff --git a/lib/TimeUtils.h b/lib/TimeUtils.h
index a55773d..03f563c 100644
--- a/lib/TimeUtils.h
+++ b/lib/TimeUtils.h
@@ -21,19 +21,23 @@
#include <pulsar/defines.h>
#include <atomic>
-#include <boost/date_time/posix_time/posix_time.hpp>
#include <chrono>
namespace pulsar {
-using namespace boost::posix_time;
-using boost::posix_time::milliseconds;
-using boost::posix_time::seconds;
+using ptime = decltype(std::chrono::high_resolution_clock::now());
+using TimeDuration = std::chrono::nanoseconds;
+
+inline decltype(std::chrono::milliseconds(0).count()) toMillis(TimeDuration
duration) {
+ return
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
+}
class PULSAR_PUBLIC TimeUtils {
public:
- static ptime now();
- static int64_t currentTimeMillis();
+ static ptime now() { return std::chrono::high_resolution_clock::now(); }
+ static int64_t currentTimeMillis() {
+ return
std::chrono::duration_cast<std::chrono::milliseconds>(now().time_since_epoch()).count();
+ }
};
// This class processes a timeout with the following semantics:
diff --git a/lib/UnAckedMessageTrackerEnabled.cc
b/lib/UnAckedMessageTrackerEnabled.cc
index 061a140..e371af9 100644
--- a/lib/UnAckedMessageTrackerEnabled.cc
+++ b/lib/UnAckedMessageTrackerEnabled.cc
@@ -34,9 +34,9 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
timeoutHandlerHelper();
ExecutorServicePtr executorService =
client_->getIOExecutorProvider()->get();
timer_ = executorService->createDeadlineTimer();
-
timer_->expires_from_now(boost::posix_time::milliseconds(tickDurationInMs_));
+ timer_->expires_from_now(std::chrono::milliseconds(tickDurationInMs_));
std::weak_ptr<UnAckedMessageTrackerEnabled> weakSelf{shared_from_this()};
- timer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+ timer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (self && !ec) {
self->timeoutHandler();
@@ -173,7 +173,7 @@ void UnAckedMessageTrackerEnabled::clear() {
}
void UnAckedMessageTrackerEnabled::stop() {
- boost::system::error_code ec;
+ ASIO_ERROR ec;
if (timer_) {
timer_->cancel(ec);
}
diff --git a/lib/UnAckedMessageTrackerEnabled.h
b/lib/UnAckedMessageTrackerEnabled.h
index 6181a8a..83edc4c 100644
--- a/lib/UnAckedMessageTrackerEnabled.h
+++ b/lib/UnAckedMessageTrackerEnabled.h
@@ -18,13 +18,13 @@
*/
#ifndef LIB_UNACKEDMESSAGETRACKERENABLED_H_
#define LIB_UNACKEDMESSAGETRACKERENABLED_H_
-#include <boost/asio/deadline_timer.hpp>
#include <deque>
#include <map>
#include <memory>
#include <mutex>
#include <set>
+#include "AsioTimer.h"
#include "TestUtil.h"
#include "UnAckedMessageTrackerInterface.h"
@@ -33,7 +33,6 @@ namespace pulsar {
class ClientImpl;
class ConsumerImplBase;
using ClientImplPtr = std::shared_ptr<ClientImpl>;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
class UnAckedMessageTrackerEnabled : public
std::enable_shared_from_this<UnAckedMessageTrackerEnabled>,
public UnAckedMessageTrackerInterface {
diff --git a/lib/auth/athenz/ZTSClient.cc b/lib/auth/athenz/ZTSClient.cc
index 230713e..35387d9 100644
--- a/lib/auth/athenz/ZTSClient.cc
+++ b/lib/auth/athenz/ZTSClient.cc
@@ -44,8 +44,6 @@ namespace ptree = boost::property_tree;
#pragma clang diagnostic ignored "-Wunknown-warning-option"
#endif
-#include <boost/xpressive/xpressive.hpp>
-
#if defined(__clang__)
#pragma clang diagnostic pop
#endif
diff --git a/lib/stats/ConsumerStatsImpl.cc b/lib/stats/ConsumerStatsImpl.cc
index 056dbf6..0eefabd 100644
--- a/lib/stats/ConsumerStatsImpl.cc
+++ b/lib/stats/ConsumerStatsImpl.cc
@@ -46,7 +46,7 @@ ConsumerStatsImpl::ConsumerStatsImpl(const ConsumerStatsImpl&
stats)
totalAckedMsgMap_(stats.totalAckedMsgMap_),
statsIntervalInSeconds_(stats.statsIntervalInSeconds_) {}
-void ConsumerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
+void ConsumerStatsImpl::flushAndReset(const ASIO_ERROR& ec) {
if (ec) {
LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
return;
@@ -85,9 +85,9 @@ void ConsumerStatsImpl::messageAcknowledged(Result res,
CommandAck_AckType ackTy
}
void ConsumerStatsImpl::scheduleTimer() {
-
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
+ timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_));
std::weak_ptr<ConsumerStatsImpl> weakSelf{shared_from_this()};
- timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) {
+ timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
diff --git a/lib/stats/ConsumerStatsImpl.h b/lib/stats/ConsumerStatsImpl.h
index 44f927f..03f3a47 100644
--- a/lib/stats/ConsumerStatsImpl.h
+++ b/lib/stats/ConsumerStatsImpl.h
@@ -20,17 +20,16 @@
#ifndef PULSAR_CONSUMER_STATS_IMPL_H_
#define PULSAR_CONSUMER_STATS_IMPL_H_
-#include <boost/asio/deadline_timer.hpp>
#include <map>
#include <memory>
#include <mutex>
#include <utility>
#include "ConsumerStatsBase.h"
+#include "lib/AsioTimer.h"
#include "lib/ExecutorService.h"
namespace pulsar {
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
@@ -58,7 +57,7 @@ class ConsumerStatsImpl : public
std::enable_shared_from_this<ConsumerStatsImpl>
public:
ConsumerStatsImpl(std::string, ExecutorServicePtr, unsigned int);
ConsumerStatsImpl(const ConsumerStatsImpl& stats);
- void flushAndReset(const boost::system::error_code&);
+ void flushAndReset(const ASIO_ERROR&);
void start() override;
void receivedMessage(Message&, Result) override;
void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums)
override;
diff --git a/lib/stats/ProducerStatsBase.h b/lib/stats/ProducerStatsBase.h
index fe0ba0a..b24266e 100644
--- a/lib/stats/ProducerStatsBase.h
+++ b/lib/stats/ProducerStatsBase.h
@@ -22,14 +22,14 @@
#include <pulsar/Message.h>
#include <pulsar/Result.h>
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include "lib/TimeUtils.h"
namespace pulsar {
class ProducerStatsBase {
public:
virtual void start() {}
virtual void messageSent(const Message& msg) = 0;
- virtual void messageReceived(Result, const boost::posix_time::ptime&) = 0;
+ virtual void messageReceived(Result, const ptime&) = 0;
virtual ~ProducerStatsBase(){};
};
diff --git a/lib/stats/ProducerStatsDisabled.h
b/lib/stats/ProducerStatsDisabled.h
index df1df0f..df1da78 100644
--- a/lib/stats/ProducerStatsDisabled.h
+++ b/lib/stats/ProducerStatsDisabled.h
@@ -25,7 +25,7 @@ namespace pulsar {
class ProducerStatsDisabled : public ProducerStatsBase {
public:
virtual void messageSent(const Message& msg){};
- virtual void messageReceived(Result, const boost::posix_time::ptime&){};
+ virtual void messageReceived(Result, const ptime&){};
};
} // namespace pulsar
#endif // PULSAR_PRODUCER_STATS_DISABLED_HEADER
diff --git a/lib/stats/ProducerStatsImpl.cc b/lib/stats/ProducerStatsImpl.cc
index 3d3629d..15e9e67 100644
--- a/lib/stats/ProducerStatsImpl.cc
+++ b/lib/stats/ProducerStatsImpl.cc
@@ -20,9 +20,11 @@
#include "ProducerStatsImpl.h"
#include <array>
+#include <chrono>
#include "lib/ExecutorService.h"
#include "lib/LogUtils.h"
+#include "lib/TimeUtils.h"
#include "lib/Utils.h"
namespace pulsar {
@@ -65,7 +67,7 @@ ProducerStatsImpl::ProducerStatsImpl(const ProducerStatsImpl&
stats)
void ProducerStatsImpl::start() { scheduleTimer(); }
-void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
+void ProducerStatsImpl::flushAndReset(const ASIO_ERROR& ec) {
if (ec) {
LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
return;
@@ -93,9 +95,10 @@ void ProducerStatsImpl::messageSent(const Message& msg) {
totalBytesSent_ += msg.getLength();
}
-void ProducerStatsImpl::messageReceived(Result res, const
boost::posix_time::ptime& publishTime) {
- boost::posix_time::ptime currentTime =
boost::posix_time::microsec_clock::universal_time();
- double diffInMicros = (currentTime - publishTime).total_microseconds();
+void ProducerStatsImpl::messageReceived(Result res, const ptime& publishTime) {
+ auto currentTime = TimeUtils::now();
+ double diffInMicros =
+ std::chrono::duration_cast<std::chrono::microseconds>(currentTime -
publishTime).count();
std::lock_guard<std::mutex> lock(mutex_);
totalLatencyAccumulator_(diffInMicros);
latencyAccumulator_(diffInMicros);
@@ -106,9 +109,9 @@ void ProducerStatsImpl::messageReceived(Result res, const
boost::posix_time::pti
ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); }
void ProducerStatsImpl::scheduleTimer() {
-
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
+ timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_));
std::weak_ptr<ProducerStatsImpl> weakSelf{shared_from_this()};
- timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) {
+ timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
diff --git a/lib/stats/ProducerStatsImpl.h b/lib/stats/ProducerStatsImpl.h
index 8cd1099..5d445c6 100644
--- a/lib/stats/ProducerStatsImpl.h
+++ b/lib/stats/ProducerStatsImpl.h
@@ -30,20 +30,18 @@
#include <boost/accumulators/framework/features.hpp>
#include <boost/accumulators/statistics.hpp>
#include <boost/accumulators/statistics/extended_p_square.hpp>
-#include <boost/asio/deadline_timer.hpp>
-#include <boost/date_time/local_time/local_time.hpp>
#include <iostream>
#include <memory>
#include <mutex>
#include <vector>
#include "ProducerStatsBase.h"
+#include "lib/AsioTimer.h"
namespace pulsar {
class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
typedef boost::accumulators::accumulator_set<
double,
@@ -83,11 +81,11 @@ class ProducerStatsImpl : public
std::enable_shared_from_this<ProducerStatsImpl>
void start() override;
- void flushAndReset(const boost::system::error_code&);
+ void flushAndReset(const ASIO_ERROR&);
void messageSent(const Message&) override;
- void messageReceived(Result, const boost::posix_time::ptime&) override;
+ void messageReceived(Result, const ptime&) override;
~ProducerStatsImpl();
diff --git a/tests/AuthPluginTest.cc b/tests/AuthPluginTest.cc
index 9fd048b..b091f97 100644
--- a/tests/AuthPluginTest.cc
+++ b/tests/AuthPluginTest.cc
@@ -21,9 +21,14 @@
#include <pulsar/Client.h>
#include <boost/algorithm/string.hpp>
+#ifdef USE_ASIO
+#include <asio.hpp>
+#else
#include <boost/asio.hpp>
+#endif
#include <thread>
+#include "lib/AsioDefines.h"
#include "lib/Future.h"
#include "lib/Latch.h"
#include "lib/LogUtils.h"
@@ -287,10 +292,9 @@ namespace testAthenz {
std::string principalToken;
void mockZTS(Latch& latch, int port) {
LOG_INFO("-- MockZTS started");
- boost::asio::io_service io;
- boost::asio::ip::tcp::iostream stream;
- boost::asio::ip::tcp::acceptor acceptor(io,
-
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port));
+ ASIO::io_service io;
+ ASIO::ip::tcp::iostream stream;
+ ASIO::ip::tcp::acceptor acceptor(io,
ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), port));
LOG_INFO("-- MockZTS waiting for connnection");
latch.countdown();
diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc
index a04da08..7595f44 100644
--- a/tests/AuthTokenTest.cc
+++ b/tests/AuthTokenTest.cc
@@ -22,7 +22,6 @@
#include <pulsar/Client.h>
#include <boost/algorithm/string.hpp>
-#include <boost/asio.hpp>
#include <fstream>
#include <streambuf>
#include <string>
diff --git a/tests/BackoffTest.cc b/tests/BackoffTest.cc
index d066b94..5fe4f71 100644
--- a/tests/BackoffTest.cc
+++ b/tests/BackoffTest.cc
@@ -26,42 +26,42 @@
#include "lib/stats/ProducerStatsImpl.h"
using namespace pulsar;
-using boost::posix_time::milliseconds;
-using boost::posix_time::seconds;
+using std::chrono::milliseconds;
+using std::chrono::seconds;
static bool checkExactAndDecrementTimer(Backoff& backoff, const unsigned int&
t2) {
- const unsigned int& t1 = backoff.next().total_milliseconds();
- boost::posix_time::ptime& firstBackOffTime =
PulsarFriend::getFirstBackoffTime(backoff);
+ auto t1 = toMillis(backoff.next());
+ auto& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff);
firstBackOffTime -= milliseconds(t2);
return t1 == t2;
}
static bool withinTenPercentAndDecrementTimer(Backoff& backoff, const unsigned
int& t2) {
- const unsigned int& t1 = backoff.next().total_milliseconds();
- boost::posix_time::ptime& firstBackOffTime =
PulsarFriend::getFirstBackoffTime(backoff);
+ auto t1 = toMillis(backoff.next());
+ auto& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff);
firstBackOffTime -= milliseconds(t2);
return (t1 >= t2 * 0.9 && t1 <= t2);
}
TEST(BackoffTest, mandatoryStopTestNegativeTest) {
Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900));
- ASSERT_EQ(backoff.next().total_milliseconds(), 100);
- backoff.next().total_milliseconds(); // 200
- backoff.next().total_milliseconds(); // 400
- backoff.next().total_milliseconds(); // 800
+ ASSERT_EQ(toMillis(backoff.next()), 100);
+ backoff.next(); // 200
+ backoff.next(); // 400
+ backoff.next(); // 800
ASSERT_FALSE(withinTenPercentAndDecrementTimer(backoff, 400));
}
TEST(BackoffTest, firstBackoffTimerTest) {
Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900));
- ASSERT_EQ(backoff.next().total_milliseconds(), 100);
- boost::posix_time::ptime firstBackOffTime =
PulsarFriend::getFirstBackoffTime(backoff);
+ ASSERT_EQ(toMillis(backoff.next()), 100);
+ auto firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff);
std::this_thread::sleep_for(std::chrono::milliseconds(300));
TimeDuration diffBackOffTime = PulsarFriend::getFirstBackoffTime(backoff)
- firstBackOffTime;
ASSERT_EQ(diffBackOffTime, milliseconds(0)); // no change since reset not
called
backoff.reset();
- ASSERT_EQ(backoff.next().total_milliseconds(), 100);
+ ASSERT_EQ(toMillis(backoff.next()), 100);
diffBackOffTime = PulsarFriend::getFirstBackoffTime(backoff) -
firstBackOffTime;
ASSERT_TRUE(diffBackOffTime >= milliseconds(300) && diffBackOffTime <
seconds(1));
}
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index c7e98bc..f97457f 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -955,7 +955,7 @@ TEST(ConsumerTest,
testGetLastMessageIdBlockWhenConnectionDisconnected) {
auto elapsed = TimeUtils::now() - start;
// getLastMessageIdAsync should be blocked until operationTimeout when the
connection is disconnected.
- ASSERT_GE(elapsed.seconds(), operationTimeout);
+
ASSERT_GE(std::chrono::duration_cast<std::chrono::seconds>(elapsed).count(),
operationTimeout);
}
TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index c2863e8..7377884 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -170,7 +170,7 @@ class PulsarFriend {
handler.connection_ = conn;
}
- static boost::posix_time::ptime& getFirstBackoffTime(Backoff& backoff) {
+ static auto getFirstBackoffTime(Backoff& backoff) ->
decltype(backoff.firstBackoffTime_)& {
return backoff.firstBackoffTime_;
}
diff --git a/tests/RoundRobinMessageRouterTest.cc
b/tests/RoundRobinMessageRouterTest.cc
index 56a7605..145c45a 100644
--- a/tests/RoundRobinMessageRouterTest.cc
+++ b/tests/RoundRobinMessageRouterTest.cc
@@ -31,7 +31,7 @@ TEST(RoundRobinMessageRouterTest, onePartition) {
const int numPartitions = 1;
RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1,
1,
- boost::posix_time::milliseconds(0));
+ std::chrono::milliseconds(0));
Message msg1 =
MessageBuilder().setPartitionKey("my-key-1").setContent("one").build();
Message msg2 =
MessageBuilder().setPartitionKey("my-key-2").setContent("two").build();
@@ -49,7 +49,7 @@ TEST(RoundRobinMessageRouterTest, sameKey) {
const int numPartitions = 13;
RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1,
1,
- boost::posix_time::milliseconds(0));
+ std::chrono::milliseconds(0));
Message msg1 =
MessageBuilder().setPartitionKey("my-key").setContent("one").build();
Message msg2 =
MessageBuilder().setPartitionKey("my-key").setContent("two").build();
@@ -63,7 +63,7 @@ TEST(RoundRobinMessageRouterTest, batchingDisabled) {
const int numPartitions = 13;
RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1,
1,
- boost::posix_time::milliseconds(0));
+ std::chrono::milliseconds(0));
Message msg1 = MessageBuilder().setContent("one").build();
Message msg2 = MessageBuilder().setContent("two").build();
@@ -77,7 +77,7 @@ TEST(RoundRobinMessageRouterTest, batchingEnabled) {
const int numPartitions = 13;
RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true,
1000, 100000,
- boost::posix_time::seconds(1));
+ std::chrono::seconds(1));
int p = -1;
for (int i = 0; i < 100; i++) {
@@ -96,7 +96,7 @@ TEST(RoundRobinMessageRouterTest, maxDelay) {
const int numPartitions = 13;
RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true,
1000, 100000,
- boost::posix_time::seconds(1));
+ std::chrono::seconds(1));
int p1 = -1;
for (int i = 0; i < 100; i++) {
@@ -132,8 +132,7 @@ TEST(RoundRobinMessageRouterTest, maxDelay) {
TEST(RoundRobinMessageRouterTest, maxNumberOfMessages) {
const int numPartitions = 13;
- RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 2,
1000,
- boost::posix_time::seconds(1));
+ RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 2,
1000, std::chrono::seconds(1));
Message msg1 = MessageBuilder().setContent("one").build();
Message msg2 = MessageBuilder().setContent("two").build();
@@ -150,8 +149,7 @@ TEST(RoundRobinMessageRouterTest, maxNumberOfMessages) {
TEST(RoundRobinMessageRouterTest, maxBatchSize) {
const int numPartitions = 13;
- RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 10,
8,
- boost::posix_time::seconds(1));
+ RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 10,
8, std::chrono::seconds(1));
Message msg1 = MessageBuilder().setContent("one").build();
Message msg2 = MessageBuilder().setContent("two").build();
diff --git a/vcpkg.json b/vcpkg.json
index d023a40..5ff4410 100644
--- a/vcpkg.json
+++ b/vcpkg.json
@@ -5,45 +5,20 @@
"builtin-baseline": "b051745c68faa6f65c493371d564c4eb8af34dad",
"dependencies": [
{
- "name": "boost-accumulators",
- "version>=": "1.83.0"
- },
- {
- "name": "boost-algorithm",
- "version>=": "1.83.0"
- },
- {
- "name": "boost-any",
- "version>=": "1.83.0"
- },
- {
- "name": "boost-asio",
- "version>=": "1.83.0"
- },
- {
- "name": "boost-circular-buffer",
- "version>=": "1.83.0"
- },
- {
- "name": "boost-date-time",
- "version>=": "1.83.0"
+ "name": "asio",
+ "features": [
+ "openssl"
+ ],
+ "version>=": "1.28.2"
},
{
- "name": "boost-predef",
+ "name": "boost-accumulators",
"version>=": "1.83.0"
},
{
"name": "boost-property-tree",
"version>=": "1.83.0"
},
- {
- "name": "boost-serialization",
- "version>=": "1.83.0"
- },
- {
- "name": "boost-xpressive",
- "version>=": "1.83.0"
- },
{
"name": "curl",
"default-features": false,