This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.8
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit 1657635401d9718a3b85479391265d22ead51290
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Oct 4 13:01:26 2025 +0800

    Adapt to latest Asio APIs (Asio 1.32 or Boost.Asio 1.88) (#477)
    
    (cherry picked from commit f2f9f65683123cfcdcefff0dde669969a476b347)
---
 .github/workflows/ci-pr-validation.yaml |   5 ++
 CMakeLists.txt                          |  10 ++-
 lib/AckGroupingTrackerEnabled.cc        |   5 +-
 lib/AsioTimer.h                         |   9 +++
 lib/ClientConnection.cc                 | 118 ++++++++++++--------------------
 lib/ClientConnection.h                  |  10 +--
 lib/ConsumerImpl.cc                     |   9 ++-
 lib/ConsumerImplBase.cc                 |   2 +-
 lib/ExecutorService.cc                  |  37 +++++-----
 lib/ExecutorService.h                   |  12 ++--
 lib/HandlerBase.cc                      |  15 ++--
 lib/JsonUtils.h                         |   7 --
 lib/MultiTopicsConsumerImpl.cc          |   7 +-
 lib/Murmur3_32Hash.cc                   |   6 +-
 lib/NegativeAcksTracker.cc              |   5 +-
 lib/PartitionedProducerImpl.cc          |   5 +-
 lib/PatternMultiTopicsConsumerImpl.cc   |   9 +--
 lib/PeriodicTask.cc                     |   7 +-
 lib/ProducerImpl.cc                     |  11 ++-
 lib/RetryableOperation.h                |   5 +-
 lib/SharedBuffer.h                      |   6 +-
 lib/UnAckedMessageTrackerEnabled.cc     |   5 +-
 lib/checksum/crc32c_sse42.cc            |  12 +---
 lib/stats/ConsumerStatsImpl.cc          |   4 +-
 lib/stats/ConsumerStatsImpl.h           |   5 +-
 lib/stats/ProducerStatsImpl.cc          |   4 +-
 lib/stats/ProducerStatsImpl.h           |   7 +-
 tests/AuthPluginTest.cc                 |   2 +-
 tests/ConsumerTest.h                    |   4 +-
 vcpkg                                   |   2 +-
 vcpkg.json                              |  23 ++++---
 31 files changed, 158 insertions(+), 210 deletions(-)

diff --git a/.github/workflows/ci-pr-validation.yaml 
b/.github/workflows/ci-pr-validation.yaml
index 157cf25..4efebfc 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -142,6 +142,11 @@ jobs:
       - name: Run unit tests
         run: RETRY_FAILED=3 CMAKE_BUILD_DIRECTORY=./build ./run-unit-tests.sh
 
+      - name: Build with Boost.Asio
+        run: |
+          cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON
+          cmake --build build-boost-asio -j8
+
       - name: Build perf tools
         run: |
           cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON 
-DBUILD_PERF_TOOLS=ON
diff --git a/CMakeLists.txt b/CMakeLists.txt
index de9a245..792c2ef 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -19,15 +19,19 @@
 
 cmake_minimum_required(VERSION 3.13)
 
-option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
-
 option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF)
 if (INTEGRATE_VCPKG)
-    set(USE_ASIO ON)
+    option(USE_ASIO "Use Asio instead of Boost.Asio" ON)
     if (NOT CMAKE_TOOLCHAIN_FILE)
         set(CMAKE_TOOLCHAIN_FILE 
"${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")
     endif ()
+    if (NOT USE_ASIO)
+        list(APPEND VCPKG_MANIFEST_FEATURES "boost-asio")
+    endif ()
+else ()
+    option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
 endif ()
+message(STATUS "USE_ASIO: ${USE_ASIO}")
 
 option(BUILD_TESTS "Build tests" ON)
 message(STATUS "BUILD_TESTS:  " ${BUILD_TESTS})
diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc
index 72f46f2..d88426e 100644
--- a/lib/AckGroupingTrackerEnabled.cc
+++ b/lib/AckGroupingTrackerEnabled.cc
@@ -121,8 +121,7 @@ AckGroupingTrackerEnabled::~AckGroupingTrackerEnabled() {
     this->flush();
     std::lock_guard<std::mutex> lock(this->mutexTimer_);
     if (this->timer_) {
-        ASIO_ERROR ec;
-        this->timer_->cancel(ec);
+        cancelTimer(*this->timer_);
     }
 }
 
@@ -172,7 +171,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
 
     std::lock_guard<std::mutex> lock(this->mutexTimer_);
     this->timer_ = this->executor_->createDeadlineTimer();
-    this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, 
this->ackGroupingTimeMs_)));
+    this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, 
this->ackGroupingTimeMs_)));
     std::weak_ptr<AckGroupingTracker> weakSelf = shared_from_this();
     this->timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
         auto self = weakSelf.lock();
diff --git a/lib/AsioTimer.h b/lib/AsioTimer.h
index d0c3de5..56cd353 100644
--- a/lib/AsioTimer.h
+++ b/lib/AsioTimer.h
@@ -29,3 +29,12 @@
 #include "AsioDefines.h"
 
 using DeadlineTimerPtr = std::shared_ptr<ASIO::steady_timer>;
+
+inline void cancelTimer(ASIO::steady_timer& timer) {
+    try {
+        timer.cancel();
+    } catch (const ASIO_SYSTEM_ERROR& ignored) {
+        // Most of the time the exception can be ignored unless the following 
logic depends on the fact that
+        // the timer is cancelled.
+    }
+}
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 9c7537a..49b2cbc 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -41,6 +41,14 @@
 #include "auth/InitialAuthData.h"
 #include "checksum/ChecksumProvider.h"
 
+#ifdef USE_ASIO
+#include <asio/connect.hpp>
+#include <asio/ssl/host_name_verification.hpp>
+#else
+#include <boost/asio/connect.hpp>
+#include <boost/asio/ssl/host_name_verification.hpp>
+#endif
+
 DECLARE_LOG_OBJECT()
 
 using namespace ASIO::ip;
@@ -170,13 +178,7 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
       executor_(executor),
       resolver_(executor_->createTcpResolver()),
       socket_(executor_->createSocket()),
-#if defined(USE_ASIO) || BOOST_VERSION >= 107000
       strand_(ASIO::make_strand(executor_->getIOService().get_executor())),
-#elif BOOST_VERSION >= 106600
-      strand_(executor_->getIOService().get_executor()),
-#else
-      strand_(executor_->getIOService()),
-#endif
       logicalAddress_(logicalAddress),
       physicalAddress_(physicalAddress),
       cnxString_("[<none> -> " + physicalAddress + "] "),
@@ -266,7 +268,7 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
         if (!clientConfiguration.isTlsAllowInsecureConnection() && 
clientConfiguration.isValidateHostName()) {
             LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" 
<< serviceUrl.port());
             std::string urlHost = isSniProxy_ ? proxyUrl.host() : 
serviceUrl.host();
-            
tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost));
+            
tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost));
         }
 
         LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
@@ -309,7 +311,7 @@ void ClientConnection::handlePulsarConnected(const 
proto::CommandConnected& cmdC
         // Only send keep-alive probes if the broker supports it
         keepAliveTimer_ = executor_->createDeadlineTimer();
         if (keepAliveTimer_) {
-            
keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
+            
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
             auto weakSelf = weak_from_this();
             keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
                 auto self = weakSelf.lock();
@@ -354,7 +356,7 @@ void 
ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
     // If the close operation has reset the consumerStatsRequestTimer_ then 
the use_count will be zero
     // Check if we have a timer still before we set the request timer to pop 
again.
     if (consumerStatsRequestTimer_) {
-        consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
+        consumerStatsRequestTimer_->expires_after(operationsTimeout_);
         auto weakSelf = weak_from_this();
         consumerStatsRequestTimer_->async_wait([weakSelf, 
consumerStatsRequests](const ASIO_ERROR& err) {
             auto self = weakSelf.lock();
@@ -394,7 +396,7 @@ typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, 
TCP_KEEPIDLE> tcp_keep
  *  if async_connect without any error, connected_ would be set to true
  *  at this point the connection is deemed valid to be used by clients of this 
class
  */
-void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, 
tcp::resolver::iterator endpointIterator) {
+void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const 
tcp::endpoint& endpoint) {
     if (!err) {
         std::stringstream cnxStringStream;
         try {
@@ -479,38 +481,13 @@ void ClientConnection::handleTcpConnected(const 
ASIO_ERROR& err, tcp::resolver::
         } else {
             handleHandshake(ASIO_SUCCESS);
         }
-    } else if (endpointIterator != tcp::resolver::iterator()) {
-        LOG_WARN(cnxString_ << "Failed to establish connection: " << 
err.message());
-        // The connection failed. Try the next endpoint in the list.
-        ASIO_ERROR closeError;
-        socket_->close(closeError);  // ignore the error of close
-        if (closeError) {
-            LOG_WARN(cnxString_ << "Failed to close socket: " << 
err.message());
-        }
-        connectTimeoutTask_->stop();
-        ++endpointIterator;
-        if (endpointIterator != tcp::resolver::iterator()) {
-            LOG_DEBUG(cnxString_ << "Connecting to " << 
endpointIterator->endpoint() << "...");
-            connectTimeoutTask_->start();
-            tcp::endpoint endpoint = *endpointIterator;
-            auto weakSelf = weak_from_this();
-            socket_->async_connect(endpoint, [weakSelf, 
endpointIterator](const ASIO_ERROR& err) {
-                auto self = weakSelf.lock();
-                if (self) {
-                    self->handleTcpConnected(err, endpointIterator);
-                }
-            });
-        } else {
-            if (err == ASIO::error::operation_aborted) {
-                // TCP connect timeout, which is not retryable
-                close();
-            } else {
-                close(ResultRetryable);
-            }
-        }
     } else {
         LOG_ERROR(cnxString_ << "Failed to establish connection: " << 
err.message());
-        close(ResultRetryable);
+        if (err == ASIO::error::operation_aborted) {
+            close();
+        } else {
+            close(ResultRetryable);
+        }
     }
 }
 
@@ -603,18 +580,18 @@ void ClientConnection::tcpConnectAsync() {
     }
 
     LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << 
service_url.port());
-    tcp::resolver::query query(service_url.host(), 
std::to_string(service_url.port()));
+
     auto weakSelf = weak_from_this();
-    resolver_->async_resolve(query,
-                             [weakSelf](const ASIO_ERROR& err, const 
tcp::resolver::iterator& iterator) {
+    resolver_->async_resolve(service_url.host(), 
std::to_string(service_url.port()),
+                             [weakSelf](const ASIO_ERROR& err, 
tcp::resolver::results_type results) {
                                  auto self = weakSelf.lock();
                                  if (self) {
-                                     self->handleResolve(err, iterator);
+                                     self->handleResolve(err, results);
                                  }
                              });
 }
 
-void ClientConnection::handleResolve(const ASIO_ERROR& err, const 
tcp::resolver::iterator& endpointIterator) {
+void ClientConnection::handleResolve(ASIO_ERROR err, const 
tcp::resolver::results_type& results) {
     if (err) {
         std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
         LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << 
err.message());
@@ -641,23 +618,13 @@ void ClientConnection::handleResolve(const ASIO_ERROR& 
err, const tcp::resolver:
         }
         ptr->connectTimeoutTask_->stop();
     });
-
-    LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() 
<< "...");
     connectTimeoutTask_->start();
-    if (endpointIterator != tcp::resolver::iterator()) {
-        LOG_DEBUG(cnxString_ << "Resolved hostname " << 
endpointIterator->host_name()  //
-                             << " to " << endpointIterator->endpoint());
-        socket_->async_connect(*endpointIterator, [weakSelf, 
endpointIterator](const ASIO_ERROR& err) {
-            auto self = weakSelf.lock();
-            if (self) {
-                self->handleTcpConnected(err, endpointIterator);
-            }
-        });
-    } else {
-        LOG_WARN(cnxString_ << "No IP address found");
-        close();
-        return;
-    }
+    ASIO::async_connect(*socket_, results, [weakSelf](const ASIO_ERROR& err, 
const tcp::endpoint& endpoint) {
+        auto self = weakSelf.lock();
+        if (self) {
+            self->handleTcpConnected(err, endpoint);
+        }
+    });
 }
 
 void ClientConnection::readNextCommand() {
@@ -1061,7 +1028,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, 
uint64_t requestId,
     LookupRequestData requestData;
     requestData.promise = promise;
     requestData.timer = executor_->createDeadlineTimer();
-    requestData.timer->expires_from_now(operationsTimeout_);
+    requestData.timer->expires_after(operationsTimeout_);
     auto weakSelf = weak_from_this();
     requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& 
ec) {
         auto self = weakSelf.lock();
@@ -1201,7 +1168,7 @@ Future<Result, ResponseData> 
ClientConnection::sendRequestWithId(const SharedBuf
 
     PendingRequestData requestData;
     requestData.timer = executor_->createDeadlineTimer();
-    requestData.timer->expires_from_now(operationsTimeout_);
+    requestData.timer->expires_after(operationsTimeout_);
     auto weakSelf = weak_from_this();
     requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& 
ec) {
         auto self = weakSelf.lock();
@@ -1256,7 +1223,7 @@ void ClientConnection::handleKeepAliveTimeout() {
         // be zero And we do not attempt to dereference the pointer.
         Lock lock(mutex_);
         if (keepAliveTimer_) {
-            
keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
+            
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
             auto weakSelf = weak_from_this();
             keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
                 auto self = weakSelf.lock();
@@ -1318,12 +1285,12 @@ void ClientConnection::close(Result result, bool 
detach) {
     numOfPendingLookupRequest_ = 0;
 
     if (keepAliveTimer_) {
-        keepAliveTimer_->cancel();
+        cancelTimer(*keepAliveTimer_);
         keepAliveTimer_.reset();
     }
 
     if (consumerStatsRequestTimer_) {
-        consumerStatsRequestTimer_->cancel();
+        cancelTimer(*consumerStatsRequestTimer_);
         consumerStatsRequestTimer_.reset();
     }
 
@@ -1435,7 +1402,7 @@ Future<Result, GetLastMessageIdResponse> 
ClientConnection::newGetLastMessageId(u
     LastMessageIdRequestData requestData;
     requestData.promise = promise;
     requestData.timer = executor_->createDeadlineTimer();
-    requestData.timer->expires_from_now(operationsTimeout_);
+    requestData.timer->expires_after(operationsTimeout_);
     auto weakSelf = weak_from_this();
     requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& 
ec) {
         auto self = weakSelf.lock();
@@ -1483,7 +1450,7 @@ Future<Result, SchemaInfo> 
ClientConnection::newGetSchema(const std::string& top
     lock.unlock();
 
     auto weakSelf = weak_from_this();
-    timer->expires_from_now(operationsTimeout_);
+    timer->expires_after(operationsTimeout_);
     timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
         auto self = weakSelf.lock();
         if (!self) {
@@ -1570,7 +1537,7 @@ void ClientConnection::handleSuccess(const 
proto::CommandSuccess& success) {
         lock.unlock();
 
         requestData.promise.setValue({});
-        requestData.timer->cancel();
+        cancelTimer(*requestData.timer);
     }
 }
 
@@ -1582,7 +1549,8 @@ void ClientConnection::handlePartitionedMetadataResponse(
     Lock lock(mutex_);
     auto it = 
pendingLookupRequests_.find(partitionMetadataResponse.request_id());
     if (it != pendingLookupRequests_.end()) {
-        it->second.timer->cancel();
+        cancelTimer(*it->second.timer);
+
         LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
         pendingLookupRequests_.erase(it);
         numOfPendingLookupRequest_--;
@@ -1661,7 +1629,7 @@ void ClientConnection::handleLookupTopicRespose(
     Lock lock(mutex_);
     auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
     if (it != pendingLookupRequests_.end()) {
-        it->second.timer->cancel();
+        cancelTimer(*it->second.timer);
         LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
         pendingLookupRequests_.erase(it);
         numOfPendingLookupRequest_--;
@@ -1739,7 +1707,7 @@ void ClientConnection::handleProducerSuccess(const 
proto::CommandProducerSuccess
                 data.topicEpoch = boost::none;
             }
             requestData.promise.setValue(data);
-            requestData.timer->cancel();
+            cancelTimer(*requestData.timer);
         }
     }
 }
@@ -1759,7 +1727,7 @@ void ClientConnection::handleError(const 
proto::CommandError& error) {
         lock.unlock();
 
         requestData.promise.setFailed(result);
-        requestData.timer->cancel();
+        cancelTimer(*requestData.timer);
     } else {
         PendingGetLastMessageIdRequestsMap::iterator it =
             pendingGetLastMessageIdRequests_.find(error.request_id());
@@ -2052,8 +2020,8 @@ void ClientConnection::unsafeRemovePendingRequest(long 
requestId) {
     auto it = pendingRequests_.find(requestId);
     if (it != pendingRequests_.end()) {
         it->second.promise.setFailed(ResultDisconnected);
-        ASIO_ERROR ec;
-        it->second.timer->cancel(ec);
+        cancelTimer(*it->second.timer);
+
         pendingRequests_.erase(it);
     }
 }
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index ff40281..cf6be65 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -26,13 +26,13 @@
 #include <cstdint>
 #ifdef USE_ASIO
 #include <asio/bind_executor.hpp>
-#include <asio/io_service.hpp>
+#include <asio/io_context.hpp>
 #include <asio/ip/tcp.hpp>
 #include <asio/ssl/stream.hpp>
 #include <asio/strand.hpp>
 #else
 #include <boost/asio/bind_executor.hpp>
-#include <boost/asio/io_service.hpp>
+#include <boost/asio/io_context.hpp>
 #include <boost/asio/ip/tcp.hpp>
 #include <boost/asio/ssl/stream.hpp>
 #include <boost/asio/strand.hpp>
@@ -238,7 +238,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
      * although not usable at this point, since this is just tcp connection
      * Pulsar - Connect/Connected has yet to happen
      */
-    void handleTcpConnected(const ASIO_ERROR& err, 
ASIO::ip::tcp::resolver::iterator endpointIterator);
+    void handleTcpConnected(const ASIO_ERROR& err, const 
ASIO::ip::tcp::endpoint& endpoint);
 
     void handleHandshake(const ASIO_ERROR& err);
 
@@ -261,7 +261,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
 
     void handlePulsarConnected(const proto::CommandConnected& cmdConnected);
 
-    void handleResolve(const ASIO_ERROR& err, const 
ASIO::ip::tcp::resolver::iterator& endpointIterator);
+    void handleResolve(ASIO_ERROR err, const 
ASIO::ip::tcp::resolver::results_type& results);
 
     void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
     void handleSendPair(const ASIO_ERROR& err);
@@ -325,7 +325,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
      */
     SocketPtr socket_;
     TlsSocketPtr tlsSocket_;
-    ASIO::strand<ASIO::io_service::executor_type> strand_;
+    ASIO::strand<ASIO::io_context::executor_type> strand_;
 
     const std::string logicalAddress_;
     /*
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 401d0a1..325adda 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -423,7 +423,7 @@ void ConsumerImpl::discardChunkMessages(const std::string& 
uuid, const MessageId
 }
 
 void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
-    
checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    
checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
     std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
     checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& 
ec) -> void {
         auto self = weakSelf.lock();
@@ -1690,7 +1690,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const 
BackoffPtr& backoff, Time
         }
         remainTime -= next;
 
-        timer->expires_from_now(next);
+        timer->expires_after(next);
 
         auto self = shared_from_this();
         timer->async_wait([this, backoff, remainTime, timer, next, callback,
@@ -1814,9 +1814,8 @@ std::shared_ptr<ConsumerImpl> 
ConsumerImpl::get_shared_this_ptr() {
 }
 
 void ConsumerImpl::cancelTimers() noexcept {
-    ASIO_ERROR ec;
-    batchReceiveTimer_->cancel(ec);
-    checkExpiredChunkedTimer_->cancel(ec);
+    cancelTimer(*batchReceiveTimer_);
+    cancelTimer(*checkExpiredChunkedTimer_);
     unAckedMessageTrackerPtr_->stop();
     consumerStatsBasePtr_->stop();
 }
diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc
index 2963a67..171256d 100644
--- a/lib/ConsumerImplBase.cc
+++ b/lib/ConsumerImplBase.cc
@@ -49,7 +49,7 @@ ConsumerImplBase::ConsumerImplBase(const ClientImplPtr& 
client, const std::strin
 
 void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
     if (timeoutMs > 0) {
-        
batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs));
+        
batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs));
         std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
         batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
             auto self = weakSelf.lock();
diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc
index 794e361..99e2393 100644
--- a/lib/ExecutorService.cc
+++ b/lib/ExecutorService.cc
@@ -31,17 +31,16 @@ ExecutorService::~ExecutorService() { close(0); }
 void ExecutorService::start() {
     auto self = shared_from_this();
     std::thread t{[this, self] {
-        LOG_DEBUG("Run io_service in a single thread");
-        ASIO_ERROR ec;
+        LOG_DEBUG("Run io_context in a single thread");
         while (!closed_) {
-            io_service_.restart();
-            IOService::work work{getIOService()};
-            io_service_.run(ec);
-        }
-        if (ec) {
-            LOG_ERROR("Failed to run io_service: " << ec.message());
-        } else {
-            LOG_DEBUG("Event loop of ExecutorService exits successfully");
+            io_context_.restart();
+            auto work_guard = ASIO::make_work_guard(io_context_);
+            try {
+                io_context_.run();
+                LOG_DEBUG("Event loop of ExecutorService exits successfully");
+            } catch (const ASIO_SYSTEM_ERROR &e) {
+                LOG_ERROR("Failed to run io_context: " << e.what());
+            }
         }
         {
             std::lock_guard<std::mutex> lock{mutex_};
@@ -63,12 +62,12 @@ ExecutorServicePtr ExecutorService::create() {
 }
 
 /*
- *  factory method of ASIO::ip::tcp::socket associated with io_service_ 
instance
+ *  factory method of ASIO::ip::tcp::socket associated with io_context_ 
instance
  *  @ returns shared_ptr to this socket
  */
 SocketPtr ExecutorService::createSocket() {
     try {
-        return SocketPtr(new ASIO::ip::tcp::socket(io_service_));
+        return SocketPtr(new ASIO::ip::tcp::socket(io_context_));
     } catch (const ASIO_SYSTEM_ERROR &e) {
         restart();
         auto error = std::string("Failed to create socket: ") + e.what();
@@ -82,12 +81,12 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr 
&socket, ASIO::ssl::cont
 }
 
 /*
- *  factory method of Resolver object associated with io_service_ instance
+ *  factory method of Resolver object associated with io_context_ instance
  *  @returns shraed_ptr to resolver object
  */
 TcpResolverPtr ExecutorService::createTcpResolver() {
     try {
-        return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_));
+        return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_context_));
     } catch (const ASIO_SYSTEM_ERROR &e) {
         restart();
         auto error = std::string("Failed to create resolver: ") + e.what();
@@ -97,7 +96,7 @@ TcpResolverPtr ExecutorService::createTcpResolver() {
 
 DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
     try {
-        return DeadlineTimerPtr(new ASIO::steady_timer(io_service_));
+        return DeadlineTimerPtr(new ASIO::steady_timer(io_context_));
     } catch (const ASIO_SYSTEM_ERROR &e) {
         restart();
         auto error = std::string("Failed to create steady_timer: ") + e.what();
@@ -105,7 +104,7 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
     }
 }
 
-void ExecutorService::restart() { io_service_.stop(); }
+void ExecutorService::restart() { io_context_.stop(); }
 
 void ExecutorService::close(long timeoutMs) {
     bool expectedState = false;
@@ -113,12 +112,12 @@ void ExecutorService::close(long timeoutMs) {
         return;
     }
     if (timeoutMs == 0) {  // non-blocking
-        io_service_.stop();
+        io_context_.stop();
         return;
     }
 
     std::unique_lock<std::mutex> lock{mutex_};
-    io_service_.stop();
+    io_context_.stop();
     if (timeoutMs > 0) {
         cond_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { 
return ioServiceDone_; });
     } else {  // < 0
@@ -126,7 +125,7 @@ void ExecutorService::close(long timeoutMs) {
     }
 }
 
-void ExecutorService::postWork(std::function<void(void)> task) { 
io_service_.post(task); }
+void ExecutorService::postWork(std::function<void(void)> task) { 
ASIO::post(io_context_, std::move(task)); }
 
 /////////////////////
 
diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h
index 89d06d3..626cb20 100644
--- a/lib/ExecutorService.h
+++ b/lib/ExecutorService.h
@@ -23,11 +23,11 @@
 
 #include <atomic>
 #ifdef USE_ASIO
-#include <asio/io_service.hpp>
+#include <asio/io_context.hpp>
 #include <asio/ip/tcp.hpp>
 #include <asio/ssl.hpp>
 #else
-#include <boost/asio/io_service.hpp>
+#include <boost/asio/io_context.hpp>
 #include <boost/asio/ip/tcp.hpp>
 #include <boost/asio/ssl.hpp>
 #endif
@@ -46,7 +46,7 @@ typedef 
std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket &> > TlsSocketPt
 typedef std::shared_ptr<ASIO::ip::tcp::resolver> TcpResolverPtr;
 class PULSAR_PUBLIC ExecutorService : public 
std::enable_shared_from_this<ExecutorService> {
    public:
-    using IOService = ASIO::io_service;
+    using IOService = ASIO::io_context;
     using SharedPtr = std::shared_ptr<ExecutorService>;
 
     static SharedPtr create();
@@ -67,14 +67,14 @@ class PULSAR_PUBLIC ExecutorService : public 
std::enable_shared_from_this<Execut
     // See TimeoutProcessor for the semantics of the parameter.
     void close(long timeoutMs = 3000);
 
-    IOService &getIOService() { return io_service_; }
+    IOService &getIOService() { return io_context_; }
     bool isClosed() const noexcept { return closed_; }
 
    private:
     /*
-     * io_service is our interface to os, io object schedule async ops on this 
object
+     * io_context is our interface to os, io object schedule async ops on this 
object
      */
-    IOService io_service_;
+    IOService io_context_;
 
     std::atomic_bool closed_{false};
     std::mutex mutex_;
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 65aa0db..ffc4e2c 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -50,9 +50,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const 
std::string& topic,
       redirectedClusterURI_("") {}
 
 HandlerBase::~HandlerBase() {
-    ASIO_ERROR ignored;
-    timer_->cancel(ignored);
-    creationTimer_->cancel(ignored);
+    cancelTimer(*timer_);
+    cancelTimer(*creationTimer_);
 }
 
 void HandlerBase::start() {
@@ -61,15 +60,14 @@ void HandlerBase::start() {
     if (state_.compare_exchange_strong(state, Pending)) {
         grabCnx();
     }
-    creationTimer_->expires_from_now(operationTimeut_);
+    creationTimer_->expires_after(operationTimeut_);
     std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
     creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
         auto self = weakSelf.lock();
         if (self && !error) {
             LOG_WARN("Cancel the pending reconnection due to the start 
timeout");
             connectionFailed(ResultTimeout);
-            ASIO_ERROR ignored;
-            timer_->cancel(ignored);
+            cancelTimer(*timer_);
         }
     });
 }
@@ -133,8 +131,7 @@ void HandlerBase::grabCnx(const 
boost::optional<std::string>& assignedBrokerUrl)
                     connectionTimeMs_ =
                         
duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
                     // Prevent the creationTimer_ from cancelling the timer_ 
in future
-                    ASIO_ERROR ignored;
-                    creationTimer_->cancel(ignored);
+                    cancelTimer(*creationTimer_);
                     LOG_INFO("Finished connecting to broker after " << 
connectionTimeMs_ << " ms")
                 } else if (isResultRetryable(result)) {
                     scheduleReconnection();
@@ -188,7 +185,7 @@ void HandlerBase::scheduleReconnection(const 
boost::optional<std::string>& assig
         TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0) 
: backoff_.next();
 
         LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) 
/ 1000.0) << " s");
-        timer_->expires_from_now(delay);
+        timer_->expires_after(delay);
         // passing shared_ptr here since time_ will get destroyed, so tasks 
will be cancelled
         // so we will not run into the case where grabCnx is invoked on out of 
scope handler
         auto name = getName();
diff --git a/lib/JsonUtils.h b/lib/JsonUtils.h
index 841eb5e..2bd4469 100644
--- a/lib/JsonUtils.h
+++ b/lib/JsonUtils.h
@@ -28,14 +28,7 @@ template <typename Ptree>
 inline std::string toJson(const Ptree& pt) {
     std::ostringstream oss;
     boost::property_tree::write_json(oss, pt, false);
-    // For Boost < 1.86, boost::property_tree will write a endline at the end
-#if BOOST_VERSION < 108600
-    auto s = oss.str();
-    s.pop_back();
-    return s;
-#else
     return oss.str();
-#endif
 }
 
 }  // namespace pulsar
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 3ce8e3c..eb3546d 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -507,7 +507,7 @@ void MultiTopicsConsumerImpl::closeAsync(const 
ResultCallback& originalCallback)
     failPendingBatchReceiveCallback();
 
     // cancel timer
-    batchReceiveTimer_->cancel();
+    cancelTimer(*batchReceiveTimer_);
 }
 
 void MultiTopicsConsumerImpl::messageReceived(const Consumer& consumer, const 
Message& msg) {
@@ -973,7 +973,7 @@ uint64_t 
MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
     return numberOfConnectedConsumer;
 }
 void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
-    partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
+    partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_);
     auto weakSelf = weak_from_this();
     partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
         // If two requests call runPartitionUpdateTask at the same time, the 
timer will fail, and it
@@ -1126,8 +1126,7 @@ void 
MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) {
 
 void MultiTopicsConsumerImpl::cancelTimers() noexcept {
     if (partitionsUpdateTimer_) {
-        ASIO_ERROR ec;
-        partitionsUpdateTimer_->cancel(ec);
+        cancelTimer(*partitionsUpdateTimer_);
     }
 }
 
diff --git a/lib/Murmur3_32Hash.cc b/lib/Murmur3_32Hash.cc
index 1b214dd..45a4988 100644
--- a/lib/Murmur3_32Hash.cc
+++ b/lib/Murmur3_32Hash.cc
@@ -23,12 +23,8 @@
 // the orignal MurmurHash3 source code.
 #include "Murmur3_32Hash.h"
 
-#include <boost/version.hpp>
-#if BOOST_VERSION >= 105500
 #include <boost/predef.h>
-#else
-#include <boost/detail/endian.hpp>
-#endif
+
 #include <limits>
 
 #if BOOST_COMP_MSVC
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index ba008a8..e5f439d 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -56,7 +56,7 @@ void NegativeAcksTracker::scheduleTimer() {
         return;
     }
     std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()};
-    timer_->expires_from_now(timerInterval_);
+    timer_->expires_after(timerInterval_);
     timer_->async_wait([weakSelf](const ASIO_ERROR &ec) {
         if (auto self = weakSelf.lock()) {
             self->handleTimer(ec);
@@ -135,8 +135,7 @@ void NegativeAcksTracker::add(const MessageId &m) {
 
 void NegativeAcksTracker::close() {
     closed_ = true;
-    ASIO_ERROR ec;
-    timer_->cancel(ec);
+    cancelTimer(*timer_);
     std::lock_guard<std::mutex> lock(mutex_);
     nackedMessages_.clear();
 }
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 9b2f5c6..4a92366 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -421,7 +421,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback 
callback) {
 
 void PartitionedProducerImpl::runPartitionUpdateTask() {
     auto weakSelf = weak_from_this();
-    partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
+    partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_);
     partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
         auto self = weakSelf.lock();
         if (self) {
@@ -524,8 +524,7 @@ uint64_t 
PartitionedProducerImpl::getNumberOfConnectedProducer() {
 
 void PartitionedProducerImpl::cancelTimers() noexcept {
     if (partitionsUpdateTimer_) {
-        ASIO_ERROR ec;
-        partitionsUpdateTimer_->cancel(ec);
+        cancelTimer(*partitionsUpdateTimer_);
     }
 }
 
diff --git a/lib/PatternMultiTopicsConsumerImpl.cc 
b/lib/PatternMultiTopicsConsumerImpl.cc
index 9f3fbb9..fd48fee 100644
--- a/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/lib/PatternMultiTopicsConsumerImpl.cc
@@ -48,7 +48,7 @@ const PULSAR_REGEX_NAMESPACE::regex 
PatternMultiTopicsConsumerImpl::getPattern()
 
 void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() {
     autoDiscoveryRunning_ = false;
-    
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
+    
autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod()));
 
     auto weakSelf = weak_from_this();
     autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
@@ -232,7 +232,7 @@ void PatternMultiTopicsConsumerImpl::start() {
     LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_.");
 
     if (conf_.getPatternAutoDiscoveryPeriod() > 0) {
-        
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
+        
autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod()));
         auto weakSelf = weak_from_this();
         autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
             if (auto self = weakSelf.lock()) {
@@ -252,7 +252,4 @@ void PatternMultiTopicsConsumerImpl::closeAsync(const 
ResultCallback& callback)
     MultiTopicsConsumerImpl::closeAsync(callback);
 }
 
-void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept {
-    ASIO_ERROR ec;
-    autoDiscoveryTimer_->cancel(ec);
-}
+void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { 
cancelTimer(*autoDiscoveryTimer_); }
diff --git a/lib/PeriodicTask.cc b/lib/PeriodicTask.cc
index 9fde012..c68d23a 100644
--- a/lib/PeriodicTask.cc
+++ b/lib/PeriodicTask.cc
@@ -29,7 +29,7 @@ void PeriodicTask::start() {
     state_ = Ready;
     if (periodMs_ >= 0) {
         std::weak_ptr<PeriodicTask> weakSelf{shared_from_this()};
-        timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
+        timer_->expires_after(std::chrono::milliseconds(periodMs_));
         timer_->async_wait([weakSelf](const ErrorCode& ec) {
             auto self = weakSelf.lock();
             if (self) {
@@ -44,8 +44,7 @@ void PeriodicTask::stop() noexcept {
     if (!state_.compare_exchange_strong(state, Closing)) {
         return;
     }
-    ErrorCode ec;
-    timer_->cancel(ec);
+    cancelTimer(*timer_);
     state_ = Pending;
 }
 
@@ -59,7 +58,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) {
     // state_ may be changed in handleTimeout, so we check state_ again
     if (state_ == Ready) {
         auto self = shared_from_this();
-        timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
+        timer_->expires_after(std::chrono::milliseconds(periodMs_));
         timer_->async_wait([this, self](const ErrorCode& ec) { 
handleTimeout(ec); });
     }
 }
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index cc8f320..21c38c4 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -570,7 +570,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& 
msg, SendCallback&& c
         bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
         bool isFull = batchMessageContainer_->add(msg, callback);
         if (isFirstMessage) {
-            
batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
+            
batchTimer_->expires_after(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
             auto weakSelf = weak_from_this();
             batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
                 auto self = weakSelf.lock();
@@ -699,7 +699,7 @@ void ProducerImpl::releaseSemaphoreForSendOp(const 
OpSendMsg& op) {
 PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& 
flushCallback) {
     PendingFailures failures;
     LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_);
-    batchTimer_->cancel();
+    cancelTimer(*batchTimer_);
     if (batchMessageContainer_->isEmpty()) {
         return failures;
     }
@@ -1009,9 +1009,8 @@ void ProducerImpl::internalShutdown() {
 
 void ProducerImpl::cancelTimers() noexcept {
     dataKeyRefreshTask_.stop();
-    ASIO_ERROR ec;
-    batchTimer_->cancel(ec);
-    sendTimer_->cancel(ec);
+    cancelTimer(*batchTimer_);
+    cancelTimer(*sendTimer_);
 }
 
 bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const 
ProducerImplPtr& b) const {
@@ -1032,7 +1031,7 @@ void ProducerImpl::startSendTimeoutTimer() {
 }
 
 void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
-    sendTimer_->expires_from_now(expiryTime);
+    sendTimer_->expires_after(expiryTime);
 
     auto weakSelf = weak_from_this();
     sendTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h
index dba190f..f4b056e 100644
--- a/lib/RetryableOperation.h
+++ b/lib/RetryableOperation.h
@@ -68,8 +68,7 @@ class RetryableOperation : public 
std::enable_shared_from_this<RetryableOperatio
 
     void cancel() {
         promise_.setFailed(ResultDisconnected);
-        ASIO_ERROR ec;
-        timer_->cancel(ec);
+        cancelTimer(*timer_);
     }
 
    private:
@@ -107,7 +106,7 @@ class RetryableOperation : public 
std::enable_shared_from_this<RetryableOperatio
             }
 
             auto delay = std::min(backoff_.next(), remainingTime);
-            timer_->expires_from_now(delay);
+            timer_->expires_after(delay);
 
             auto nextRemainingTime = remainingTime - delay;
             LOG_INFO("Reschedule " << name_ << " for " << toMillis(delay)
diff --git a/lib/SharedBuffer.h b/lib/SharedBuffer.h
index 033802f..c8779dc 100644
--- a/lib/SharedBuffer.h
+++ b/lib/SharedBuffer.h
@@ -151,11 +151,11 @@ class SharedBuffer {
 
     inline bool writable() const { return writableBytes() > 0; }
 
-    ASIO::const_buffers_1 const_asio_buffer() const {
-        return ASIO::const_buffers_1(ptr_ + readIdx_, readableBytes());
+    ASIO::const_buffer const_asio_buffer() const {
+        return ASIO::const_buffer(ptr_ + readIdx_, readableBytes());
     }
 
-    ASIO::mutable_buffers_1 asio_buffer() {
+    ASIO::mutable_buffer asio_buffer() {
         assert(data_);
         return ASIO::buffer(ptr_ + writeIdx_, writableBytes());
     }
diff --git a/lib/UnAckedMessageTrackerEnabled.cc 
b/lib/UnAckedMessageTrackerEnabled.cc
index b61c4c9..3e9ce0e 100644
--- a/lib/UnAckedMessageTrackerEnabled.cc
+++ b/lib/UnAckedMessageTrackerEnabled.cc
@@ -38,7 +38,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
     }
     ExecutorServicePtr executorService = 
client->getIOExecutorProvider()->get();
     timer_ = executorService->createDeadlineTimer();
-    timer_->expires_from_now(std::chrono::milliseconds(tickDurationInMs_));
+    timer_->expires_after(std::chrono::milliseconds(tickDurationInMs_));
     std::weak_ptr<UnAckedMessageTrackerEnabled> weakSelf{shared_from_this()};
     timer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
         auto self = weakSelf.lock();
@@ -177,9 +177,8 @@ void UnAckedMessageTrackerEnabled::clear() {
 }
 
 void UnAckedMessageTrackerEnabled::stop() {
-    ASIO_ERROR ec;
     if (timer_) {
-        timer_->cancel(ec);
+        cancelTimer(*timer_);
     }
 }
 } /* namespace pulsar */
diff --git a/lib/checksum/crc32c_sse42.cc b/lib/checksum/crc32c_sse42.cc
index 8c52a27..8a24aeb 100644
--- a/lib/checksum/crc32c_sse42.cc
+++ b/lib/checksum/crc32c_sse42.cc
@@ -15,18 +15,8 @@
  
******************************************************************************/
 #include "crc32c_sse42.h"
 
-#include <boost/version.hpp>
-#if BOOST_VERSION >= 105500
-#include <boost/predef.h>
-#else
-#if _MSC_VER
-#pragma message("Boost version is < 1.55, disable CRC32C")
-#else
-#warning "Boost version is < 1.55, disable CRC32C"
-#endif
-#endif
-
 #include <assert.h>
+#include <boost/predef.h>
 #include <stdlib.h>
 
 #include "gf2.hpp"
diff --git a/lib/stats/ConsumerStatsImpl.cc b/lib/stats/ConsumerStatsImpl.cc
index 74c7117..3dd1a73 100644
--- a/lib/stats/ConsumerStatsImpl.cc
+++ b/lib/stats/ConsumerStatsImpl.cc
@@ -59,7 +59,7 @@ void ConsumerStatsImpl::flushAndReset(const ASIO_ERROR& ec) {
     LOG_INFO(oss.str());
 }
 
-ConsumerStatsImpl::~ConsumerStatsImpl() { timer_->cancel(); }
+ConsumerStatsImpl::~ConsumerStatsImpl() { cancelTimer(*timer_); }
 
 void ConsumerStatsImpl::start() { scheduleTimer(); }
 
@@ -80,7 +80,7 @@ void ConsumerStatsImpl::messageAcknowledged(Result res, 
CommandAck_AckType ackTy
 }
 
 void ConsumerStatsImpl::scheduleTimer() {
-    timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_));
+    timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_));
     std::weak_ptr<ConsumerStatsImpl> weakSelf{shared_from_this()};
     timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
         auto self = weakSelf.lock();
diff --git a/lib/stats/ConsumerStatsImpl.h b/lib/stats/ConsumerStatsImpl.h
index 0ce0ef1..726fc7e 100644
--- a/lib/stats/ConsumerStatsImpl.h
+++ b/lib/stats/ConsumerStatsImpl.h
@@ -55,10 +55,7 @@ class ConsumerStatsImpl : public 
std::enable_shared_from_this<ConsumerStatsImpl>
     ConsumerStatsImpl(const ConsumerStatsImpl& stats);
     void flushAndReset(const ASIO_ERROR&);
     void start() override;
-    void stop() override {
-        ASIO_ERROR error;
-        timer_->cancel(error);
-    }
+    void stop() override { cancelTimer(*timer_); }
     void receivedMessage(Message&, Result) override;
     void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) 
override;
     virtual ~ConsumerStatsImpl();
diff --git a/lib/stats/ProducerStatsImpl.cc b/lib/stats/ProducerStatsImpl.cc
index 8aae280..a42532d 100644
--- a/lib/stats/ProducerStatsImpl.cc
+++ b/lib/stats/ProducerStatsImpl.cc
@@ -106,10 +106,10 @@ void ProducerStatsImpl::messageReceived(Result res, const 
ptime& publishTime) {
     totalSendMap_[res] += 1;  // Value will automatically be initialized to 0 
in the constructor
 }
 
-ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); }
+ProducerStatsImpl::~ProducerStatsImpl() { cancelTimer(*timer_); }
 
 void ProducerStatsImpl::scheduleTimer() {
-    timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_));
+    timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_));
     std::weak_ptr<ProducerStatsImpl> weakSelf{shared_from_this()};
     timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
         auto self = weakSelf.lock();
diff --git a/lib/stats/ProducerStatsImpl.h b/lib/stats/ProducerStatsImpl.h
index 495a89c..e189c5f 100644
--- a/lib/stats/ProducerStatsImpl.h
+++ b/lib/stats/ProducerStatsImpl.h
@@ -20,17 +20,14 @@
 #ifndef PULSAR_PRODUCER_STATS_IMPL_HEADER
 #define PULSAR_PRODUCER_STATS_IMPL_HEADER
 
-#include <map>
-
-#if BOOST_VERSION >= 106400
-#include <boost/serialization/array_wrapper.hpp>
-#endif
 #include <boost/accumulators/accumulators.hpp>
 #include <boost/accumulators/framework/accumulator_set.hpp>
 #include <boost/accumulators/framework/features.hpp>
 #include <boost/accumulators/statistics.hpp>
 #include <boost/accumulators/statistics/extended_p_square.hpp>
+#include <boost/serialization/array_wrapper.hpp>
 #include <iostream>
+#include <map>
 #include <memory>
 #include <mutex>
 #include <vector>
diff --git a/tests/AuthPluginTest.cc b/tests/AuthPluginTest.cc
index e48dbfb..6c6b898 100644
--- a/tests/AuthPluginTest.cc
+++ b/tests/AuthPluginTest.cc
@@ -370,7 +370,7 @@ class SocketStream {
 
 void mockZTS(Latch& latch, int port) {
     LOG_INFO("-- MockZTS started");
-    ASIO::io_service io;
+    ASIO::io_context io;
     ASIO::ip::tcp::acceptor acceptor(io, 
ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), port));
 
     LOG_INFO("-- MockZTS waiting for connnection");
diff --git a/tests/ConsumerTest.h b/tests/ConsumerTest.h
index 8248287..9d190c1 100644
--- a/tests/ConsumerTest.h
+++ b/tests/ConsumerTest.h
@@ -46,8 +46,8 @@ class ConsumerTest {
             return nullptr;
         }
         auto timer = cnx->executor_->createDeadlineTimer();
-        timer->expires_from_now(delaySinceStartGrabCnx -
-                                
std::chrono::milliseconds(impl->connectionTimeMs_ + 50));
+        timer->expires_after(delaySinceStartGrabCnx -
+                             std::chrono::milliseconds(impl->connectionTimeMs_ 
+ 50));
         timer->async_wait([cnx](const ASIO_ERROR&) { cnx->close(); });
         return timer;
     }
diff --git a/vcpkg b/vcpkg
index d6995a0..0d9d468 160000
--- a/vcpkg
+++ b/vcpkg
@@ -1 +1 @@
-Subproject commit d6995a0cf3cafda5e9e52749fad075dd62bfd90c
+Subproject commit 0d9d4684352ba8de70bdf251c6fc9a3c464fa12b
diff --git a/vcpkg.json b/vcpkg.json
index 5ec0e0e..83c926a 100644
--- a/vcpkg.json
+++ b/vcpkg.json
@@ -9,19 +9,19 @@
       "features": [
         "openssl"
       ],
-      "version>=": "1.28.2"
+      "version>=": "1.32.0"
     },
     {
       "name": "boost-accumulators",
-      "version>=": "1.83.0"
+      "version>=": "1.88.0"
     },
     {
       "name": "boost-format",
-      "version>=": "1.83.0"
+      "version>=": "1.88.0"
     },
     {
       "name": "boost-property-tree",
-      "version>=": "1.83.0"
+      "version>=": "1.88.0"
     },
     {
       "name": "curl",
@@ -61,12 +61,21 @@
     }
   ],
   "features": {
+    "boost-asio": {
+      "description": "Use Boost.Asio instead of standalone Asio",
+      "dependencies": [
+        {
+          "name": "boost-asio",
+          "version>=": "1.88.0"
+        }
+      ]
+    },
     "perf": {
       "description": "Build Performance Tool",
       "dependencies": [
         {
           "name": "boost-program-options",
-          "version>=": "1.83.0"
+          "version>=": "1.88.0"
         }
       ]
     },
@@ -81,10 +90,6 @@
     }
   },
   "overrides": [
-    {
-      "name": "asio",
-      "version": "1.28.2"
-    },
     {
       "name": "protobuf",
       "version": "3.21.12"


Reply via email to