This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 90ea369  Depend on the independent Asio instead of Boost.Asio by 
default (#382)
90ea369 is described below

commit 90ea3695b80660f837785d98e96047d90de3f64f
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Jan 8 02:13:49 2024 +0800

    Depend on the independent Asio instead of Boost.Asio by default (#382)
    
    Fixes https://github.com/apache/pulsar-client-cpp/issues/367
    
    ### Motivation
    
    See the difference of Asio and Boost.Asio here: 
https://think-async.com/Asio/AsioAndBoostAsio.html
    
    Asio is updated more frequently than Boost.Asio and its release does not
    need to be synchronous with other Boost components. Depending on the
    independent Asio could make it easier for a newer Asio release.
    
    ### Modifications
    
    Import `asio` 1.28.2 as the dependency and remove the `boost-asio`
    dependency from the vcpkg.json. Since the latest Asio already removed
    the `deadline_timer`, this patch replaces all `deadline_timer` with
    `steady_timer`, which uses `std::chrono` rather than Boost.Date_Time
    component to compute the timeout.
    
    Add a `USE_ASIO` CMake option to determine whether Asio or Boost.Asio is
    depended. For vcpkg users, the option is always enabled.
    
    Finally, simplify the vcpkg.json by removing some `boost-*` dependencies
    depended indirectly by the rest two major dependencies:
    - boost-accumulators: latency percentiles computation
    - boost-property-tree: JSON operations
    
    These two dependencies are hard to remove for now unless introducing
    other dependencies so they will be kept from some time.
---
 .github/workflows/ci-build-binary-artifacts.yaml   |   2 +
 .github/workflows/ci-pr-validation.yaml            |   2 +
 CMakeLists.txt                                     |  11 +
 LegacyFindPackages.cmake                           |   4 -
 lib/AckGroupingTrackerEnabled.cc                   |   6 +-
 lib/AckGroupingTrackerEnabled.h                    |   3 +-
 .../ProducerStatsDisabled.h => AsioDefines.h}      |  25 +-
 lib/{TimeUtils.cc => AsioTimer.h}                  |  19 +-
 lib/Backoff.cc                                     |   7 +-
 lib/Backoff.h                                      |   8 +-
 lib/ClientConnection.cc                            | 254 +++++++++------------
 lib/ClientConnection.h                             |  73 +++---
 lib/CompressionCodec.cc                            |   1 -
 lib/ConnectionPool.cc                              |   9 +-
 lib/ConsumerImpl.cc                                |  19 +-
 lib/ConsumerImplBase.cc                            |   4 +-
 lib/ExecutorService.cc                             |  24 +-
 lib/ExecutorService.h                              |  20 +-
 lib/HandlerBase.cc                                 |   9 +-
 lib/HandlerBase.h                                  |  10 +-
 lib/Int64SerDes.h                                  |   7 +-
 lib/MultiTopicsConsumerImpl.cc                     |  10 +-
 lib/MultiTopicsConsumerImpl.h                      |   3 +-
 lib/NegativeAcksTracker.cc                         |  12 +-
 lib/NegativeAcksTracker.h                          |   8 +-
 lib/OpSendMsg.h                                    |   6 +-
 lib/PartitionedProducerImpl.cc                     |   8 +-
 lib/PartitionedProducerImpl.h                      |   6 +-
 lib/PatternMultiTopicsConsumerImpl.cc              |  12 +-
 lib/PatternMultiTopicsConsumerImpl.h               |   5 +-
 lib/PeriodicTask.cc                                |   8 +-
 lib/PeriodicTask.h                                 |   2 +-
 lib/ProducerImpl.cc                                |  25 +-
 lib/ProducerImpl.h                                 |  15 +-
 lib/RetryableOperation.h                           |  23 +-
 lib/RoundRobinMessageRouter.cc                     |   5 +-
 lib/RoundRobinMessageRouter.h                      |   6 +-
 lib/SharedBuffer.h                                 |  27 ++-
 lib/TimeUtils.h                                    |  16 +-
 lib/UnAckedMessageTrackerEnabled.cc                |   6 +-
 lib/UnAckedMessageTrackerEnabled.h                 |   3 +-
 lib/auth/athenz/ZTSClient.cc                       |   2 -
 lib/stats/ConsumerStatsImpl.cc                     |   6 +-
 lib/stats/ConsumerStatsImpl.h                      |   5 +-
 lib/stats/ProducerStatsBase.h                      |   4 +-
 lib/stats/ProducerStatsDisabled.h                  |   2 +-
 lib/stats/ProducerStatsImpl.cc                     |  15 +-
 lib/stats/ProducerStatsImpl.h                      |   8 +-
 tests/AuthPluginTest.cc                            |  12 +-
 tests/AuthTokenTest.cc                             |   1 -
 tests/BackoffTest.cc                               |  26 +--
 tests/ConsumerTest.cc                              |   2 +-
 tests/PulsarFriend.h                               |   2 +-
 tests/RoundRobinMessageRouterTest.cc               |  16 +-
 vcpkg.json                                         |  37 +--
 55 files changed, 422 insertions(+), 439 deletions(-)

diff --git a/.github/workflows/ci-build-binary-artifacts.yaml 
b/.github/workflows/ci-build-binary-artifacts.yaml
index a2b7be8..63644e5 100644
--- a/.github/workflows/ci-build-binary-artifacts.yaml
+++ b/.github/workflows/ci-build-binary-artifacts.yaml
@@ -148,6 +148,7 @@ jobs:
           mkdir -p $BUILD_DIR
           cmake -B $BUILD_DIR \
             -G "${{ matrix.generator }}" ${{ matrix.arch }} \
+            -DUSE_ASIO=ON \
             -DBUILD_TESTS=OFF \
             -DVCPKG_TRIPLET=${{ matrix.triplet }} \
             -DCMAKE_INSTALL_PREFIX=${{ env.INSTALL_DIR }} \
@@ -174,6 +175,7 @@ jobs:
           mkdir -p $BUILD_DIR
           cmake -B $BUILD_DIR \
             -G "${{ matrix.generator }}" ${{ matrix.arch }} \
+            -DUSE_ASIO=ON \
             -DBUILD_TESTS=OFF \
             -DVCPKG_TRIPLET=${{ matrix.triplet }} \
             -DCMAKE_INSTALL_PREFIX=$INSTALL_DIR_DEBUG \
diff --git a/.github/workflows/ci-pr-validation.yaml 
b/.github/workflows/ci-pr-validation.yaml
index 50a5c80..56309e9 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -191,6 +191,7 @@ jobs:
             cmake \
               -B ./build-1 \
               -G "${{ matrix.generator }}" ${{ matrix.arch }} \
+              -DUSE_ASIO=ON \
               -DBUILD_TESTS=OFF \
               -DVCPKG_TRIPLET="${{ matrix.triplet }}" \
               -DCMAKE_INSTALL_PREFIX="${{ env.INSTALL_DIR }}" \
@@ -232,6 +233,7 @@ jobs:
             cmake \
               -B ./build-2 \
               -G "${{ matrix.generator }}" ${{ matrix.arch }} \
+              -DUSE_ASIO=ON \
               -DBUILD_TESTS=OFF \
               -DVCPKG_TRIPLET="${{ matrix.triplet }}" \
               -DCMAKE_INSTALL_PREFIX="${{ env.INSTALL_DIR }}" \
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5bde2b7..662e84a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -19,8 +19,11 @@
 
 cmake_minimum_required(VERSION 3.13)
 
+option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
+
 option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF)
 if (INTEGRATE_VCPKG)
+    set(USE_ASIO ON)
     set(CMAKE_TOOLCHAIN_FILE 
"${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")
 endif ()
 
@@ -129,6 +132,10 @@ if (INTEGRATE_VCPKG)
         
$<IF:$<TARGET_EXISTS:zstd::libzstd_shared>,zstd::libzstd_shared,zstd::libzstd_static>
         Snappy::snappy
         )
+    if (USE_ASIO)
+        find_package(asio CONFIG REQUIRED)
+        set(COMMON_LIBS ${COMMON_LIBS} asio::asio)
+    endif ()
     add_definitions(-DHAS_ZSTD -DHAS_SNAPPY)
     if (MSVC)
         find_package(dlfcn-win32 CONFIG REQUIRED)
@@ -140,6 +147,10 @@ else ()
     include(./LegacyFindPackages.cmake)
 endif ()
 
+if (USE_ASIO)
+    add_definitions(-DUSE_ASIO)
+endif ()
+
 set(LIB_NAME $ENV{PULSAR_LIBRARY_NAME})
 if (NOT LIB_NAME)
     set(LIB_NAME pulsar)
diff --git a/LegacyFindPackages.cmake b/LegacyFindPackages.cmake
index 10c9fa7..5004545 100644
--- a/LegacyFindPackages.cmake
+++ b/LegacyFindPackages.cmake
@@ -176,10 +176,6 @@ if (Boost_MAJOR_VERSION EQUAL 1 AND Boost_MINOR_VERSION 
LESS 69)
     MESSAGE(STATUS "Linking with Boost:System")
 endif()
 
-if (MSVC)
-    set(BOOST_COMPONENTS ${BOOST_COMPONENTS} date_time)
-endif()
-
 if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.9)
     # GCC 4.8.2 implementation of std::regex is buggy
     set(BOOST_COMPONENTS ${BOOST_COMPONENTS} regex)
diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc
index d90cc87..7233b2c 100644
--- a/lib/AckGroupingTrackerEnabled.cc
+++ b/lib/AckGroupingTrackerEnabled.cc
@@ -117,7 +117,7 @@ void AckGroupingTrackerEnabled::close() {
     this->flush();
     std::lock_guard<std::mutex> lock(this->mutexTimer_);
     if (this->timer_) {
-        boost::system::error_code ec;
+        ASIO_ERROR ec;
         this->timer_->cancel(ec);
     }
 }
@@ -168,9 +168,9 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
 
     std::lock_guard<std::mutex> lock(this->mutexTimer_);
     this->timer_ = this->executor_->createDeadlineTimer();
-    
this->timer_->expires_from_now(boost::posix_time::milliseconds(std::max(1L, 
this->ackGroupingTimeMs_)));
+    this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, 
this->ackGroupingTimeMs_)));
     auto self = shared_from_this();
-    this->timer_->async_wait([this, self](const boost::system::error_code& ec) 
-> void {
+    this->timer_->async_wait([this, self](const ASIO_ERROR& ec) -> void {
         if (!ec) {
             this->flush();
             this->scheduleTimer();
diff --git a/lib/AckGroupingTrackerEnabled.h b/lib/AckGroupingTrackerEnabled.h
index ec1d66b..b04f405 100644
--- a/lib/AckGroupingTrackerEnabled.h
+++ b/lib/AckGroupingTrackerEnabled.h
@@ -22,18 +22,17 @@
 #include <pulsar/MessageId.h>
 
 #include <atomic>
-#include <boost/asio/deadline_timer.hpp>
 #include <cstdint>
 #include <mutex>
 #include <set>
 
 #include "AckGroupingTracker.h"
+#include "AsioTimer.h"
 
 namespace pulsar {
 
 class ClientImpl;
 using ClientImplPtr = std::shared_ptr<ClientImpl>;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
 class ExecutorService;
 using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
 class HandlerBase;
diff --git a/lib/stats/ProducerStatsDisabled.h b/lib/AsioDefines.h
similarity index 64%
copy from lib/stats/ProducerStatsDisabled.h
copy to lib/AsioDefines.h
index df1df0f..2e89812 100644
--- a/lib/stats/ProducerStatsDisabled.h
+++ b/lib/AsioDefines.h
@@ -16,16 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+// This header defines common macros to use Asio or Boost.Asio.
+#pragma once
 
-#ifndef PULSAR_PRODUCER_STATS_DISABLED_HEADER
-#define PULSAR_PRODUCER_STATS_DISABLED_HEADER
-#include "ProducerStatsBase.h"
-
-namespace pulsar {
-class ProducerStatsDisabled : public ProducerStatsBase {
-   public:
-    virtual void messageSent(const Message& msg){};
-    virtual void messageReceived(Result, const boost::posix_time::ptime&){};
-};
-}  // namespace pulsar
-#endif  // PULSAR_PRODUCER_STATS_DISABLED_HEADER
+#ifdef USE_ASIO
+#define ASIO ::asio
+#define ASIO_ERROR asio::error_code
+#define ASIO_SUCCESS (ASIO_ERROR{})
+#define ASIO_SYSTEM_ERROR asio::system_error
+#else
+#define ASIO boost::asio
+#define ASIO_ERROR boost::system::error_code
+#define ASIO_SUCCESS 
boost::system::errc::make_error_code(boost::system::errc::success)
+#define ASIO_SYSTEM_ERROR boost::system::system_error
+#endif
diff --git a/lib/TimeUtils.cc b/lib/AsioTimer.h
similarity index 71%
rename from lib/TimeUtils.cc
rename to lib/AsioTimer.h
index 7eecb86..d0c3de5 100644
--- a/lib/TimeUtils.cc
+++ b/lib/AsioTimer.h
@@ -16,17 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#pragma once
 
-#include "TimeUtils.h"
+#ifdef USE_ASIO
+#include <asio/steady_timer.hpp>
+#else
+#include <boost/asio/steady_timer.hpp>
+#endif
 
-namespace pulsar {
+#include <memory>
 
-ptime TimeUtils::now() { return microsec_clock::universal_time(); }
+#include "AsioDefines.h"
 
-int64_t TimeUtils::currentTimeMillis() {
-    static ptime time_t_epoch(boost::gregorian::date(1970, 1, 1));
-
-    time_duration diff = now() - time_t_epoch;
-    return diff.total_milliseconds();
-}
-}  // namespace pulsar
\ No newline at end of file
+using DeadlineTimerPtr = std::shared_ptr<ASIO::steady_timer>;
diff --git a/lib/Backoff.cc b/lib/Backoff.cc
index fdb1359..e2c43d1 100644
--- a/lib/Backoff.cc
+++ b/lib/Backoff.cc
@@ -21,6 +21,9 @@
 #include <time.h> /* time */
 
 #include <algorithm>
+#include <chrono>
+
+#include "TimeUtils.h"
 
 namespace pulsar {
 
@@ -33,8 +36,8 @@ TimeDuration Backoff::next() {
 
     // Check for mandatory stop
     if (!mandatoryStopMade_) {
-        const boost::posix_time::ptime& now = 
boost::posix_time::microsec_clock::universal_time();
-        TimeDuration timeElapsedSinceFirstBackoff = 
boost::posix_time::milliseconds(0);
+        auto now = TimeUtils::now();
+        TimeDuration timeElapsedSinceFirstBackoff = 
std::chrono::nanoseconds(0);
         if (initial_ == current) {
             firstBackoffTime_ = now;
         } else {
diff --git a/lib/Backoff.h b/lib/Backoff.h
index a59c00f..d9d7fae 100644
--- a/lib/Backoff.h
+++ b/lib/Backoff.h
@@ -20,12 +20,12 @@
 #define _PULSAR_BACKOFF_HEADER_
 #include <pulsar/defines.h>
 
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include <chrono>
 #include <random>
 
-namespace pulsar {
+#include "TimeUtils.h"
 
-using TimeDuration = boost::posix_time::time_duration;
+namespace pulsar {
 
 class PULSAR_PUBLIC Backoff {
    public:
@@ -38,7 +38,7 @@ class PULSAR_PUBLIC Backoff {
     const TimeDuration max_;
     TimeDuration next_;
     TimeDuration mandatoryStop_;
-    boost::posix_time::ptime firstBackoffTime_;
+    decltype(std::chrono::high_resolution_clock::now()) firstBackoffTime_;
     std::mt19937 rng_;
     bool mandatoryStopMade_ = false;
 
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 61aa7f7..82ab492 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -23,6 +23,7 @@
 #include <boost/optional.hpp>
 #include <fstream>
 
+#include "AsioDefines.h"
 #include "Commands.h"
 #include "ConnectionPool.h"
 #include "ConsumerImpl.h"
@@ -39,7 +40,7 @@
 
 DECLARE_LOG_OBJECT()
 
-using namespace boost::asio::ip;
+using namespace ASIO::ip;
 
 namespace pulsar {
 
@@ -162,19 +163,13 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
                                    const ClientConfiguration& 
clientConfiguration,
                                    const AuthenticationPtr& authentication, 
const std::string& clientVersion,
                                    ConnectionPool& pool, size_t poolIndex)
-    : 
operationsTimeout_(seconds(clientConfiguration.getOperationTimeoutSeconds())),
+    : 
operationsTimeout_(std::chrono::seconds(clientConfiguration.getOperationTimeoutSeconds())),
       authentication_(authentication),
       serverProtocolVersion_(proto::ProtocolVersion_MIN),
       executor_(executor),
       resolver_(executor_->createTcpResolver()),
       socket_(executor_->createSocket()),
-#if BOOST_VERSION >= 107000
-      
strand_(boost::asio::make_strand(executor_->getIOService().get_executor())),
-#elif BOOST_VERSION >= 106600
-      strand_(executor_->getIOService().get_executor()),
-#else
-      strand_(executor_->getIOService()),
-#endif
+      strand_(ASIO::make_strand(executor_->getIOService().get_executor())),
       logicalAddress_(logicalAddress),
       physicalAddress_(physicalAddress),
       cnxString_("[<none> -> " + physicalAddress + "] "),
@@ -203,11 +198,7 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
     }
 
     if (clientConfiguration.isUseTls()) {
-#if BOOST_VERSION >= 105400
-        boost::asio::ssl::context 
ctx(boost::asio::ssl::context::tlsv12_client);
-#else
-        boost::asio::ssl::context ctx(executor_->getIOService(), 
boost::asio::ssl::context::tlsv1_client);
-#endif
+        ASIO::ssl::context ctx(ASIO::ssl::context::tlsv12_client);
         Url serviceUrl;
         Url proxyUrl;
         Url::parse(physicalAddress, serviceUrl);
@@ -219,10 +210,10 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
             LOG_INFO("Configuring SNI Proxy-url=" << proxyServiceUrl_);
         }
         if (clientConfiguration.isTlsAllowInsecureConnection()) {
-            ctx.set_verify_mode(boost::asio::ssl::context::verify_none);
+            ctx.set_verify_mode(ASIO::ssl::context::verify_none);
             isTlsAllowInsecureConnection_ = true;
         } else {
-            ctx.set_verify_mode(boost::asio::ssl::context::verify_peer);
+            ctx.set_verify_mode(ASIO::ssl::context::verify_peer);
 
             std::string trustCertFilePath = 
clientConfiguration.getTlsTrustCertsFilePath();
             if (!trustCertFilePath.empty()) {
@@ -252,12 +243,12 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
                 LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
                 throw ResultAuthenticationError;
             }
-            ctx.use_private_key_file(tlsPrivateKey, 
boost::asio::ssl::context::pem);
-            ctx.use_certificate_file(tlsCertificates, 
boost::asio::ssl::context::pem);
+            ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem);
+            ctx.use_certificate_file(tlsCertificates, ASIO::ssl::context::pem);
         } else {
             if (file_exists(tlsPrivateKey) && file_exists(tlsCertificates)) {
-                ctx.use_private_key_file(tlsPrivateKey, 
boost::asio::ssl::context::pem);
-                ctx.use_certificate_file(tlsCertificates, 
boost::asio::ssl::context::pem);
+                ctx.use_private_key_file(tlsPrivateKey, 
ASIO::ssl::context::pem);
+                ctx.use_certificate_file(tlsCertificates, 
ASIO::ssl::context::pem);
             }
         }
 
@@ -266,14 +257,13 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
         if (!clientConfiguration.isTlsAllowInsecureConnection() && 
clientConfiguration.isValidateHostName()) {
             LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" 
<< serviceUrl.port());
             std::string urlHost = isSniProxy_ ? proxyUrl.host() : 
serviceUrl.host();
-            
tlsSocket_->set_verify_callback(boost::asio::ssl::rfc2818_verification(urlHost));
+            
tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost));
         }
 
         LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
         if (!SSL_set_tlsext_host_name(tlsSocket_->native_handle(), 
serviceUrl.host().c_str())) {
-            boost::system::error_code ec{static_cast<int>(::ERR_get_error()),
-                                         
boost::asio::error::get_ssl_category()};
-            LOG_ERROR(boost::system::system_error{ec}.what() << ": Error while 
setting TLS SNI");
+            ASIO_ERROR ec{static_cast<int>(::ERR_get_error()), 
ASIO::error::get_ssl_category()};
+            LOG_ERROR(ec.message() << ": Error while setting TLS SNI");
             return;
         }
     }
@@ -310,9 +300,9 @@ void ClientConnection::handlePulsarConnected(const 
proto::CommandConnected& cmdC
         // Only send keep-alive probes if the broker supports it
         keepAliveTimer_ = executor_->createDeadlineTimer();
         if (keepAliveTimer_) {
-            
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+            
keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds));
             auto weakSelf = weak_from_this();
-            keepAliveTimer_->async_wait([weakSelf](const 
boost::system::error_code&) {
+            keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
                 auto self = weakSelf.lock();
                 if (self) {
                     self->handleKeepAliveTimeout();
@@ -357,13 +347,12 @@ void 
ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
     if (consumerStatsRequestTimer_) {
         consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
         auto weakSelf = weak_from_this();
-        consumerStatsRequestTimer_->async_wait(
-            [weakSelf, consumerStatsRequests](const boost::system::error_code& 
err) {
-                auto self = weakSelf.lock();
-                if (self) {
-                    self->handleConsumerStatsTimeout(err, 
consumerStatsRequests);
-                }
-            });
+        consumerStatsRequestTimer_->async_wait([weakSelf, 
consumerStatsRequests](const ASIO_ERROR& err) {
+            auto self = weakSelf.lock();
+            if (self) {
+                self->handleConsumerStatsTimeout(err, consumerStatsRequests);
+            }
+        });
     }
     lock.unlock();
     // Complex logic since promises need to be fulfilled outside the lock
@@ -375,19 +364,19 @@ void 
ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
 
 /// The number of unacknowledged probes to send before considering the 
connection dead and notifying the
 /// application layer
-typedef boost::asio::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPCNT> 
tcp_keep_alive_count;
+typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPCNT> 
tcp_keep_alive_count;
 
 /// The interval between subsequential keepalive probes, regardless of what 
the connection has exchanged in
 /// the meantime
-typedef boost::asio::detail::socket_option::integer<IPPROTO_TCP, 
TCP_KEEPINTVL> tcp_keep_alive_interval;
+typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPINTVL> 
tcp_keep_alive_interval;
 
 /// The interval between the last data packet sent (simple ACKs are not 
considered data) and the first
 /// keepalive
 /// probe; after the connection is marked to need keepalive, this counter is 
not used any further
 #ifdef __APPLE__
-typedef boost::asio::detail::socket_option::integer<IPPROTO_TCP, 
TCP_KEEPALIVE> tcp_keep_alive_idle;
+typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPALIVE> 
tcp_keep_alive_idle;
 #else
-typedef boost::asio::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> 
tcp_keep_alive_idle;
+typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> 
tcp_keep_alive_idle;
 #endif
 
 /*
@@ -396,15 +385,14 @@ typedef 
boost::asio::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> t
  *  if async_connect without any error, connected_ would be set to true
  *  at this point the connection is deemed valid to be used by clients of this 
class
  */
-void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
-                                          tcp::resolver::iterator 
endpointIterator) {
+void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, 
tcp::resolver::iterator endpointIterator) {
     if (!err) {
         std::stringstream cnxStringStream;
         try {
             cnxStringStream << "[" << socket_->local_endpoint() << " -> " << 
socket_->remote_endpoint()
                             << "] ";
             cnxString_ = cnxStringStream.str();
-        } catch (const boost::system::system_error& e) {
+        } catch (const ASIO_SYSTEM_ERROR& e) {
             LOG_ERROR("Failed to get endpoint: " << e.what());
             close(ResultRetryable);
             return;
@@ -424,7 +412,7 @@ void ClientConnection::handleTcpConnected(const 
boost::system::error_code& err,
         state_ = TcpConnected;
         lock.unlock();
 
-        boost::system::error_code error;
+        ASIO_ERROR error;
         socket_->set_option(tcp::no_delay(true), error);
         if (error) {
             LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << 
error.message());
@@ -457,7 +445,7 @@ void ClientConnection::handleTcpConnected(const 
boost::system::error_code& err,
 
         if (tlsSocket_) {
             if (!isTlsAllowInsecureConnection_) {
-                boost::system::error_code err;
+                ASIO_ERROR err;
                 Url service_url;
                 if (!Url::parse(physicalAddress_, service_url)) {
                     LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " 
<< err << " " << err.message());
@@ -466,26 +454,21 @@ void ClientConnection::handleTcpConnected(const 
boost::system::error_code& err,
                 }
             }
             auto weakSelf = weak_from_this();
-            auto callback = [weakSelf](const boost::system::error_code& err) {
+            auto callback = [weakSelf](const ASIO_ERROR& err) {
                 auto self = weakSelf.lock();
                 if (self) {
                     self->handleHandshake(err);
                 }
             };
-#if BOOST_VERSION >= 106600
-            
tlsSocket_->async_handshake(boost::asio::ssl::stream<tcp::socket>::client,
-                                        boost::asio::bind_executor(strand_, 
callback));
-#else
-            
tlsSocket_->async_handshake(boost::asio::ssl::stream<tcp::socket>::client,
-                                        strand_.wrap(callback));
-#endif
+            tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
+                                        ASIO::bind_executor(strand_, 
callback));
         } else {
-            
handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success));
+            handleHandshake(ASIO_SUCCESS);
         }
     } else if (endpointIterator != tcp::resolver::iterator()) {
         LOG_WARN(cnxString_ << "Failed to establish connection: " << 
err.message());
         // The connection failed. Try the next endpoint in the list.
-        boost::system::error_code closeError;
+        ASIO_ERROR closeError;
         socket_->close(closeError);  // ignore the error of close
         if (closeError) {
             LOG_WARN(cnxString_ << "Failed to close socket: " << 
err.message());
@@ -497,15 +480,14 @@ void ClientConnection::handleTcpConnected(const 
boost::system::error_code& err,
             connectTimeoutTask_->start();
             tcp::endpoint endpoint = *endpointIterator;
             auto weakSelf = weak_from_this();
-            socket_->async_connect(endpoint,
-                                   [weakSelf, endpointIterator](const 
boost::system::error_code& err) {
-                                       auto self = weakSelf.lock();
-                                       if (self) {
-                                           self->handleTcpConnected(err, 
endpointIterator);
-                                       }
-                                   });
+            socket_->async_connect(endpoint, [weakSelf, 
endpointIterator](const ASIO_ERROR& err) {
+                auto self = weakSelf.lock();
+                if (self) {
+                    self->handleTcpConnected(err, endpointIterator);
+                }
+            });
         } else {
-            if (err == boost::asio::error::operation_aborted) {
+            if (err == ASIO::error::operation_aborted) {
                 // TCP connect timeout, which is not retryable
                 close();
             } else {
@@ -518,7 +500,7 @@ void ClientConnection::handleTcpConnected(const 
boost::system::error_code& err,
     }
 }
 
-void ClientConnection::handleHandshake(const boost::system::error_code& err) {
+void ClientConnection::handleHandshake(const ASIO_ERROR& err) {
     if (err) {
         LOG_ERROR(cnxString_ << "Handshake failed: " << err.message());
         close();
@@ -537,13 +519,12 @@ void ClientConnection::handleHandshake(const 
boost::system::error_code& err) {
     // Send CONNECT command to broker
     auto self = shared_from_this();
     asyncWrite(buffer.const_asio_buffer(),
-               customAllocWriteHandler([this, self, buffer](const 
boost::system::error_code& err, size_t) {
+               customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& 
err, size_t) {
                    handleSentPulsarConnect(err, buffer);
                }));
 }
 
-void ClientConnection::handleSentPulsarConnect(const 
boost::system::error_code& err,
-                                               const SharedBuffer& buffer) {
+void ClientConnection::handleSentPulsarConnect(const ASIO_ERROR& err, const 
SharedBuffer& buffer) {
     if (isClosed()) {
         return;
     }
@@ -557,8 +538,7 @@ void ClientConnection::handleSentPulsarConnect(const 
boost::system::error_code&
     readNextCommand();
 }
 
-void ClientConnection::handleSentAuthResponse(const boost::system::error_code& 
err,
-                                              const SharedBuffer& buffer) {
+void ClientConnection::handleSentAuthResponse(const ASIO_ERROR& err, const 
SharedBuffer& buffer) {
     if (isClosed()) {
         return;
     }
@@ -580,7 +560,7 @@ void ClientConnection::tcpConnectAsync() {
         return;
     }
 
-    boost::system::error_code err;
+    ASIO_ERROR err;
     Url service_url;
     std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
     if (!Url::parse(hostUrl, service_url)) {
@@ -599,17 +579,15 @@ void ClientConnection::tcpConnectAsync() {
     LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << 
service_url.port());
     tcp::resolver::query query(service_url.host(), 
std::to_string(service_url.port()));
     auto weakSelf = weak_from_this();
-    resolver_->async_resolve(
-        query, [weakSelf](const boost::system::error_code& err, 
tcp::resolver::iterator iterator) {
-            auto self = weakSelf.lock();
-            if (self) {
-                self->handleResolve(err, iterator);
-            }
-        });
+    resolver_->async_resolve(query, [weakSelf](const ASIO_ERROR& err, 
tcp::resolver::iterator iterator) {
+        auto self = weakSelf.lock();
+        if (self) {
+            self->handleResolve(err, iterator);
+        }
+    });
 }
 
-void ClientConnection::handleResolve(const boost::system::error_code& err,
-                                     tcp::resolver::iterator endpointIterator) 
{
+void ClientConnection::handleResolve(const ASIO_ERROR& err, 
tcp::resolver::iterator endpointIterator) {
     if (err) {
         std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
         LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << 
err.message());
@@ -642,13 +620,12 @@ void ClientConnection::handleResolve(const 
boost::system::error_code& err,
     if (endpointIterator != tcp::resolver::iterator()) {
         LOG_DEBUG(cnxString_ << "Resolved hostname " << 
endpointIterator->host_name()  //
                              << " to " << endpointIterator->endpoint());
-        socket_->async_connect(*endpointIterator,
-                               [weakSelf, endpointIterator](const 
boost::system::error_code& err) {
-                                   auto self = weakSelf.lock();
-                                   if (self) {
-                                       self->handleTcpConnected(err, 
endpointIterator);
-                                   }
-                               });
+        socket_->async_connect(*endpointIterator, [weakSelf, 
endpointIterator](const ASIO_ERROR& err) {
+            auto self = weakSelf.lock();
+            if (self) {
+                self->handleTcpConnected(err, endpointIterator);
+            }
+        });
     } else {
         LOG_WARN(cnxString_ << "No IP address found");
         close();
@@ -659,15 +636,13 @@ void ClientConnection::handleResolve(const 
boost::system::error_code& err,
 void ClientConnection::readNextCommand() {
     const static uint32_t minReadSize = sizeof(uint32_t);
     auto self = shared_from_this();
-    asyncReceive(
-        incomingBuffer_.asio_buffer(),
-        customAllocReadHandler([this, self](const boost::system::error_code& 
err, size_t bytesTransferred) {
-            handleRead(err, bytesTransferred, minReadSize);
-        }));
+    asyncReceive(incomingBuffer_.asio_buffer(),
+                 customAllocReadHandler([this, self](const ASIO_ERROR& err, 
size_t bytesTransferred) {
+                     handleRead(err, bytesTransferred, minReadSize);
+                 }));
 }
 
-void ClientConnection::handleRead(const boost::system::error_code& err, size_t 
bytesTransferred,
-                                  uint32_t minReadSize) {
+void ClientConnection::handleRead(const ASIO_ERROR& err, size_t 
bytesTransferred, uint32_t minReadSize) {
     if (isClosed()) {
         return;
     }
@@ -675,9 +650,9 @@ void ClientConnection::handleRead(const 
boost::system::error_code& err, size_t b
     incomingBuffer_.bytesWritten(bytesTransferred);
 
     if (err || bytesTransferred == 0) {
-        if (err == boost::asio::error::operation_aborted) {
+        if (err == ASIO::error::operation_aborted) {
             LOG_DEBUG(cnxString_ << "Read operation was canceled: " << 
err.message());
-        } else if (bytesTransferred == 0 || err == boost::asio::error::eof) {
+        } else if (bytesTransferred == 0 || err == ASIO::error::eof) {
             LOG_DEBUG(cnxString_ << "Server closed the connection: " << 
err.message());
         } else {
             LOG_ERROR(cnxString_ << "Read operation failed: " << 
err.message());
@@ -689,11 +664,11 @@ void ClientConnection::handleRead(const 
boost::system::error_code& err, size_t b
         SharedBuffer buffer = incomingBuffer_.slice(bytesTransferred);
         auto self = shared_from_this();
         auto nextMinReadSize = minReadSize - bytesTransferred;
-        asyncReceive(buffer.asio_buffer(), customAllocReadHandler([this, self, 
nextMinReadSize](
-                                                                      const 
boost::system::error_code& err,
-                                                                      size_t 
bytesTransferred) {
-                         handleRead(err, bytesTransferred, nextMinReadSize);
-                     }));
+        asyncReceive(buffer.asio_buffer(),
+                     customAllocReadHandler(
+                         [this, self, nextMinReadSize](const ASIO_ERROR& err, 
size_t bytesTransferred) {
+                             handleRead(err, bytesTransferred, 
nextMinReadSize);
+                         }));
     } else {
         processIncomingBuffer();
     }
@@ -720,12 +695,11 @@ void ClientConnection::processIncomingBuffer() {
                 incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, 
newBufferSize);
             }
             auto self = shared_from_this();
-            asyncReceive(
-                incomingBuffer_.asio_buffer(),
-                customAllocReadHandler([this, self, bytesToReceive](const 
boost::system::error_code& err,
-                                                                    size_t 
bytesTransferred) {
-                    handleRead(err, bytesTransferred, bytesToReceive);
-                }));
+            asyncReceive(incomingBuffer_.asio_buffer(),
+                         customAllocReadHandler(
+                             [this, self, bytesToReceive](const ASIO_ERROR& 
err, size_t bytesTransferred) {
+                                 handleRead(err, bytesTransferred, 
bytesToReceive);
+                             }));
             return;
         }
 
@@ -803,11 +777,11 @@ void ClientConnection::processIncomingBuffer() {
         uint32_t minReadSize = sizeof(uint32_t) - 
incomingBuffer_.readableBytes();
 
         auto self = shared_from_this();
-        asyncReceive(incomingBuffer_.asio_buffer(),
-                     customAllocReadHandler([this, self, minReadSize](const 
boost::system::error_code& err,
-                                                                      size_t 
bytesTransferred) {
-                         handleRead(err, bytesTransferred, minReadSize);
-                     }));
+        asyncReceive(
+            incomingBuffer_.asio_buffer(),
+            customAllocReadHandler([this, self, minReadSize](const ASIO_ERROR& 
err, size_t bytesTransferred) {
+                handleRead(err, bytesTransferred, minReadSize);
+            }));
         return;
     }
 
@@ -1056,7 +1030,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, 
const uint64_t request
     requestData.timer = executor_->createDeadlineTimer();
     requestData.timer->expires_from_now(operationsTimeout_);
     auto weakSelf = weak_from_this();
-    requestData.timer->async_wait([weakSelf, requestData](const 
boost::system::error_code& ec) {
+    requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& 
ec) {
         auto self = weakSelf.lock();
         if (self) {
             self->handleLookupTimeout(ec, requestData);
@@ -1082,11 +1056,7 @@ void ClientConnection::sendCommand(const SharedBuffer& 
cmd) {
                     self->sendCommandInternal(cmd);
                 }
             };
-#if BOOST_VERSION >= 106600
-            boost::asio::post(strand_, callback);
-#else
-            strand_.post(callback);
-#endif
+            ASIO::post(strand_, callback);
         } else {
             sendCommandInternal(cmd);
         }
@@ -1099,8 +1069,9 @@ void ClientConnection::sendCommand(const SharedBuffer& 
cmd) {
 void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) {
     auto self = shared_from_this();
     asyncWrite(cmd.const_asio_buffer(),
-               customAllocWriteHandler([this, self, cmd](const 
boost::system::error_code& err,
-                                                         size_t 
bytesTransferred) { handleSend(err, cmd); }));
+               customAllocWriteHandler([this, self, cmd](const ASIO_ERROR& 
err, size_t bytesTransferred) {
+                   handleSend(err, cmd);
+               }));
 }
 
 void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) 
{
@@ -1116,21 +1087,18 @@ void ClientConnection::sendMessage(const 
std::shared_ptr<SendArguments>& args) {
         // Capture the buffer because asio does not copy the buffer, if the 
buffer is destroyed before the
         // callback is called, an invalid buffer range might be passed to the 
underlying socket send.
         asyncWrite(buffer, customAllocWriteHandler(
-                               [this, self, buffer](const 
boost::system::error_code& err,
-                                                    size_t bytesTransferred) { 
handleSendPair(err); }));
+                               [this, self, buffer](const ASIO_ERROR& err, 
size_t bytesTransferred) {
+                                   handleSendPair(err);
+                               }));
     };
     if (tlsSocket_) {
-#if BOOST_VERSION >= 106600
-        boost::asio::post(strand_, sendMessageInternal);
-#else
-        strand_.post(sendMessageInternal);
-#endif
+        ASIO::post(strand_, sendMessageInternal);
     } else {
         sendMessageInternal();
     }
 }
 
-void ClientConnection::handleSend(const boost::system::error_code& err, const 
SharedBuffer&) {
+void ClientConnection::handleSend(const ASIO_ERROR& err, const SharedBuffer&) {
     if (isClosed()) {
         return;
     }
@@ -1142,7 +1110,7 @@ void ClientConnection::handleSend(const 
boost::system::error_code& err, const Sh
     }
 }
 
-void ClientConnection::handleSendPair(const boost::system::error_code& err) {
+void ClientConnection::handleSendPair(const ASIO_ERROR& err) {
     if (isClosed()) {
         return;
     }
@@ -1166,8 +1134,8 @@ void ClientConnection::sendPendingCommands() {
         if (any.type() == typeid(SharedBuffer)) {
             SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
             asyncWrite(buffer.const_asio_buffer(),
-                       customAllocWriteHandler([this, self, buffer](const 
boost::system::error_code& err,
-                                                                    size_t) { 
handleSend(err, buffer); }));
+                       customAllocWriteHandler(
+                           [this, self, buffer](const ASIO_ERROR& err, size_t) 
{ handleSend(err, buffer); }));
         } else {
             assert(any.type() == typeid(std::shared_ptr<SendArguments>));
 
@@ -1178,9 +1146,9 @@ void ClientConnection::sendPendingCommands() {
 
             // Capture the buffer because asio does not copy the buffer, if 
the buffer is destroyed before the
             // callback is called, an invalid buffer range might be passed to 
the underlying socket send.
-            asyncWrite(buffer,
-                       customAllocWriteHandler([this, self, buffer](const 
boost::system::error_code& err,
-                                                                    size_t) { 
handleSendPair(err); }));
+            asyncWrite(buffer, customAllocWriteHandler([this, self, 
buffer](const ASIO_ERROR& err, size_t) {
+                           handleSendPair(err);
+                       }));
         }
     } else {
         // No more pending writes
@@ -1202,7 +1170,7 @@ Future<Result, ResponseData> 
ClientConnection::sendRequestWithId(SharedBuffer cm
     requestData.timer = executor_->createDeadlineTimer();
     requestData.timer->expires_from_now(operationsTimeout_);
     auto weakSelf = weak_from_this();
-    requestData.timer->async_wait([weakSelf, requestData](const 
boost::system::error_code& ec) {
+    requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& 
ec) {
         auto self = weakSelf.lock();
         if (self) {
             self->handleRequestTimeout(ec, requestData);
@@ -1216,21 +1184,19 @@ Future<Result, ResponseData> 
ClientConnection::sendRequestWithId(SharedBuffer cm
     return requestData.promise.getFuture();
 }
 
-void ClientConnection::handleRequestTimeout(const boost::system::error_code& 
ec,
-                                            PendingRequestData 
pendingRequestData) {
+void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec, 
PendingRequestData pendingRequestData) {
     if (!ec && !pendingRequestData.hasGotResponse->load()) {
         pendingRequestData.promise.setFailed(ResultTimeout);
     }
 }
 
-void ClientConnection::handleLookupTimeout(const boost::system::error_code& ec,
-                                           LookupRequestData 
pendingRequestData) {
+void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec, 
LookupRequestData pendingRequestData) {
     if (!ec) {
         pendingRequestData.promise->setFailed(ResultTimeout);
     }
 }
 
-void ClientConnection::handleGetLastMessageIdTimeout(const 
boost::system::error_code& ec,
+void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec,
                                                      
ClientConnection::LastMessageIdRequestData data) {
     if (!ec) {
         data.promise->setFailed(ResultTimeout);
@@ -1255,9 +1221,9 @@ void ClientConnection::handleKeepAliveTimeout() {
         // be zero And we do not attempt to dereference the pointer.
         Lock lock(mutex_);
         if (keepAliveTimer_) {
-            
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+            
keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds));
             auto weakSelf = weak_from_this();
-            keepAliveTimer_->async_wait([weakSelf](const 
boost::system::error_code&) {
+            keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
                 auto self = weakSelf.lock();
                 if (self) {
                     self->handleKeepAliveTimeout();
@@ -1268,7 +1234,7 @@ void ClientConnection::handleKeepAliveTimeout() {
     }
 }
 
-void ClientConnection::handleConsumerStatsTimeout(const 
boost::system::error_code& ec,
+void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec,
                                                   std::vector<uint64_t> 
consumerStatsRequests) {
     if (ec) {
         LOG_DEBUG(cnxString_ << " Ignoring timer cancelled event, code[" << ec 
<< "]");
@@ -1285,15 +1251,15 @@ void ClientConnection::close(Result result, bool 
detach) {
     state_ = Disconnected;
 
     if (socket_) {
-        boost::system::error_code err;
-        socket_->shutdown(boost::asio::socket_base::shutdown_both, err);
+        ASIO_ERROR err;
+        socket_->shutdown(ASIO::socket_base::shutdown_both, err);
         socket_->close(err);
         if (err) {
             LOG_WARN(cnxString_ << "Failed to close socket: " << 
err.message());
         }
     }
     if (tlsSocket_) {
-        boost::system::error_code err;
+        ASIO_ERROR err;
         tlsSocket_->lowest_layer().close(err);
         if (err) {
             LOG_WARN(cnxString_ << "Failed to close TLS socket: " << 
err.message());
@@ -1432,7 +1398,7 @@ Future<Result, GetLastMessageIdResponse> 
ClientConnection::newGetLastMessageId(u
     requestData.timer = executor_->createDeadlineTimer();
     requestData.timer->expires_from_now(operationsTimeout_);
     auto weakSelf = weak_from_this();
-    requestData.timer->async_wait([weakSelf, requestData](const 
boost::system::error_code& ec) {
+    requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& 
ec) {
         auto self = weakSelf.lock();
         if (self) {
             self->handleGetLastMessageIdTimeout(ec, requestData);
@@ -1823,7 +1789,7 @@ void ClientConnection::handleAuthChallenge() {
     }
     auto self = shared_from_this();
     asyncWrite(buffer.const_asio_buffer(),
-               customAllocWriteHandler([this, self, buffer](const 
boost::system::error_code& err, size_t) {
+               customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& 
err, size_t) {
                    handleSentAuthResponse(err, buffer);
                }));
 }
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 1bc1bd8..69155fd 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -23,13 +23,20 @@
 #include <pulsar/defines.h>
 
 #include <atomic>
-#include <boost/any.hpp>
+#ifdef USE_ASIO
+#include <asio/bind_executor.hpp>
+#include <asio/io_service.hpp>
+#include <asio/ip/tcp.hpp>
+#include <asio/ssl/stream.hpp>
+#include <asio/strand.hpp>
+#else
 #include <boost/asio/bind_executor.hpp>
-#include <boost/asio/deadline_timer.hpp>
 #include <boost/asio/io_service.hpp>
 #include <boost/asio/ip/tcp.hpp>
 #include <boost/asio/ssl/stream.hpp>
 #include <boost/asio/strand.hpp>
+#endif
+#include <boost/any.hpp>
 #include <boost/optional.hpp>
 #include <deque>
 #include <functional>
@@ -37,19 +44,18 @@
 #include <string>
 #include <vector>
 
+#include "AsioTimer.h"
 #include "Commands.h"
 #include "GetLastMessageIdResponse.h"
 #include "LookupDataResult.h"
 #include "SharedBuffer.h"
+#include "TimeUtils.h"
 #include "UtilAllocator.h"
-
 namespace pulsar {
 
 class PulsarFriend;
 
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
-using TimeDuration = boost::posix_time::time_duration;
-using TcpResolverPtr = std::shared_ptr<boost::asio::ip::tcp::resolver>;
+using TcpResolverPtr = std::shared_ptr<ASIO::ip::tcp::resolver>;
 
 class ExecutorService;
 using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
@@ -114,10 +120,10 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     };
 
    public:
-    typedef std::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
-    typedef 
std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&>> 
TlsSocketPtr;
+    typedef std::shared_ptr<ASIO::ip::tcp::socket> SocketPtr;
+    typedef std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket&>> 
TlsSocketPtr;
     typedef std::shared_ptr<ClientConnection> ConnectionPtr;
-    typedef std::function<void(const boost::system::error_code&, 
ConnectionPtr)> ConnectionListener;
+    typedef std::function<void(const ASIO_ERROR&, ConnectionPtr)> 
ConnectionListener;
     typedef std::vector<ConnectionListener>::iterator ListenerIterator;
 
     /*
@@ -224,17 +230,16 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
      * although not usable at this point, since this is just tcp connection
      * Pulsar - Connect/Connected has yet to happen
      */
-    void handleTcpConnected(const boost::system::error_code& err,
-                            boost::asio::ip::tcp::resolver::iterator 
endpointIterator);
+    void handleTcpConnected(const ASIO_ERROR& err, 
ASIO::ip::tcp::resolver::iterator endpointIterator);
 
-    void handleHandshake(const boost::system::error_code& err);
+    void handleHandshake(const ASIO_ERROR& err);
 
-    void handleSentPulsarConnect(const boost::system::error_code& err, const 
SharedBuffer& buffer);
-    void handleSentAuthResponse(const boost::system::error_code& err, const 
SharedBuffer& buffer);
+    void handleSentPulsarConnect(const ASIO_ERROR& err, const SharedBuffer& 
buffer);
+    void handleSentAuthResponse(const ASIO_ERROR& err, const SharedBuffer& 
buffer);
 
     void readNextCommand();
 
-    void handleRead(const boost::system::error_code& err, size_t 
bytesTransferred, uint32_t minReadSize);
+    void handleRead(const ASIO_ERROR& err, size_t bytesTransferred, uint32_t 
minReadSize);
 
     void processIncomingBuffer();
     bool verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t& 
remainingBytes,
@@ -248,19 +253,18 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
 
     void handlePulsarConnected(const proto::CommandConnected& cmdConnected);
 
-    void handleResolve(const boost::system::error_code& err,
-                       boost::asio::ip::tcp::resolver::iterator 
endpointIterator);
+    void handleResolve(const ASIO_ERROR& err, 
ASIO::ip::tcp::resolver::iterator endpointIterator);
 
-    void handleSend(const boost::system::error_code& err, const SharedBuffer& 
cmd);
-    void handleSendPair(const boost::system::error_code& err);
+    void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
+    void handleSendPair(const ASIO_ERROR& err);
     void sendPendingCommands();
     void newLookup(const SharedBuffer& cmd, const uint64_t requestId, 
LookupDataResultPromisePtr promise);
 
-    void handleRequestTimeout(const boost::system::error_code& ec, 
PendingRequestData pendingRequestData);
+    void handleRequestTimeout(const ASIO_ERROR& ec, PendingRequestData 
pendingRequestData);
 
-    void handleLookupTimeout(const boost::system::error_code&, 
LookupRequestData);
+    void handleLookupTimeout(const ASIO_ERROR&, LookupRequestData);
 
-    void handleGetLastMessageIdTimeout(const boost::system::error_code&, 
LastMessageIdRequestData data);
+    void handleGetLastMessageIdTimeout(const ASIO_ERROR&, 
LastMessageIdRequestData data);
 
     void handleKeepAliveTimeout();
 
@@ -280,13 +284,9 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
             return;
         }
         if (tlsSocket_) {
-#if BOOST_VERSION >= 106600
-            boost::asio::async_write(*tlsSocket_, buffers, 
boost::asio::bind_executor(strand_, handler));
-#else
-            boost::asio::async_write(*tlsSocket_, buffers, 
strand_.wrap(handler));
-#endif
+            ASIO::async_write(*tlsSocket_, buffers, 
ASIO::bind_executor(strand_, handler));
         } else {
-            boost::asio::async_write(*socket_, buffers, handler);
+            ASIO::async_write(*socket_, buffers, handler);
         }
     }
 
@@ -296,11 +296,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
             return;
         }
         if (tlsSocket_) {
-#if BOOST_VERSION >= 106600
-            tlsSocket_->async_read_some(buffers, 
boost::asio::bind_executor(strand_, handler));
-#else
-            tlsSocket_->async_read_some(buffers, strand_.wrap(handler));
-#endif
+            tlsSocket_->async_read_some(buffers, ASIO::bind_executor(strand_, 
handler));
         } else {
             socket_->async_receive(buffers, handler);
         }
@@ -321,11 +317,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
      */
     SocketPtr socket_;
     TlsSocketPtr tlsSocket_;
-#if BOOST_VERSION >= 106600
-    boost::asio::strand<boost::asio::io_service::executor_type> strand_;
-#else
-    boost::asio::io_service::strand strand_;
-#endif
+    ASIO::strand<ASIO::io_service::executor_type> strand_;
 
     const std::string logicalAddress_;
     /*
@@ -343,7 +335,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     /*
      *  indicates if async connection establishment failed
      */
-    boost::system::error_code error_;
+    ASIO_ERROR error_;
 
     SharedBuffer incomingBuffer_;
 
@@ -392,8 +384,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     DeadlineTimerPtr keepAliveTimer_;
     DeadlineTimerPtr consumerStatsRequestTimer_;
 
-    void handleConsumerStatsTimeout(const boost::system::error_code& ec,
-                                    std::vector<uint64_t> 
consumerStatsRequests);
+    void handleConsumerStatsTimeout(const ASIO_ERROR& ec, 
std::vector<uint64_t> consumerStatsRequests);
 
     void startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests);
     uint32_t maxPendingLookupRequest_;
diff --git a/lib/CompressionCodec.cc b/lib/CompressionCodec.cc
index 991d52c..6105887 100644
--- a/lib/CompressionCodec.cc
+++ b/lib/CompressionCodec.cc
@@ -45,7 +45,6 @@ CompressionCodec& 
CompressionCodecProvider::getCodec(CompressionType compression
         default:
             return compressionCodecNone_;
     }
-    BOOST_THROW_EXCEPTION(std::logic_error("Invalid CompressionType 
enumeration value"));
 }
 
 SharedBuffer CompressionCodecNone::encode(const SharedBuffer& raw) { return 
raw; }
diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc
index 95170a9..4cc8883 100644
--- a/lib/ConnectionPool.cc
+++ b/lib/ConnectionPool.cc
@@ -18,15 +18,20 @@
  */
 #include "ConnectionPool.h"
 
+#ifdef USE_ASIO
+#include <asio/ip/tcp.hpp>
+#include <asio/ssl.hpp>
+#else
 #include <boost/asio/ip/tcp.hpp>
 #include <boost/asio/ssl.hpp>
+#endif
 
 #include "ClientConnection.h"
 #include "ExecutorService.h"
 #include "LogUtils.h"
 
-using boost::asio::ip::tcp;
-namespace ssl = boost::asio::ssl;
+using ASIO::ip::tcp;
+namespace ssl = ASIO::ssl;
 typedef ssl::stream<tcp::socket> ssl_socket;
 
 DECLARE_LOG_OBJECT()
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index f9770d8..5216218 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -26,6 +26,7 @@
 #include "AckGroupingTracker.h"
 #include "AckGroupingTrackerDisabled.h"
 #include "AckGroupingTrackerEnabled.h"
+#include "AsioDefines.h"
 #include "BatchMessageAcker.h"
 #include "BatchedMessageIdImpl.h"
 #include "BitSet.h"
@@ -55,6 +56,9 @@ namespace pulsar {
 
 DECLARE_LOG_OBJECT()
 
+using std::chrono::milliseconds;
+using std::chrono::seconds;
+
 ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& 
topic,
                            const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
                            bool isPersistent, const ConsumerInterceptorsPtr& 
interceptors,
@@ -402,10 +406,9 @@ void ConsumerImpl::discardChunkMessages(std::string uuid, 
MessageId messageId, b
 }
 
 void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
-    checkExpiredChunkedTimer_->expires_from_now(
-        
boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    
checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
     std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
-    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const 
boost::system::error_code& ec) -> void {
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& 
ec) -> void {
         auto self = weakSelf.lock();
         if (!self) {
             return;
@@ -1581,7 +1584,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const 
BackoffPtr& backoff, Time
         }
     } else {
         TimeDuration next = std::min(remainTime, backoff->next());
-        if (next.total_milliseconds() <= 0) {
+        if (toMillis(next) <= 0) {
             LOG_ERROR(getName() << " Client Connection not ready for 
Consumer");
             callback(ResultNotConnected, MessageId());
             return;
@@ -1592,8 +1595,8 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const 
BackoffPtr& backoff, Time
 
         auto self = shared_from_this();
         timer->async_wait([this, backoff, remainTime, timer, next, callback,
-                           self](const boost::system::error_code& ec) -> void {
-            if (ec == boost::asio::error::operation_aborted) {
+                           self](const ASIO_ERROR& ec) -> void {
+            if (ec == ASIO::error::operation_aborted) {
                 LOG_DEBUG(getName() << " Get last message id operation was 
cancelled, code[" << ec << "].");
                 return;
             }
@@ -1602,7 +1605,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const 
BackoffPtr& backoff, Time
                 return;
             }
             LOG_WARN(getName() << " Could not get connection while 
getLastMessageId -- Will try again in "
-                               << next.total_milliseconds() << " ms")
+                               << toMillis(next) << " ms")
             this->internalGetLastMessageIdAsync(backoff, remainTime, timer, 
callback);
         });
     }
@@ -1693,7 +1696,7 @@ std::shared_ptr<ConsumerImpl> 
ConsumerImpl::get_shared_this_ptr() {
 }
 
 void ConsumerImpl::cancelTimers() noexcept {
-    boost::system::error_code ec;
+    ASIO_ERROR ec;
     batchReceiveTimer_->cancel(ec);
     checkExpiredChunkedTimer_->cancel(ec);
     unAckedMessageTrackerPtr_->stop();
diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc
index 3921237..851d41e 100644
--- a/lib/ConsumerImplBase.cc
+++ b/lib/ConsumerImplBase.cc
@@ -51,9 +51,9 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, 
const std::string& topi
 
 void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
     if (timeoutMs > 0) {
-        
batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        
batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs));
         std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
-        batchReceiveTimer_->async_wait([weakSelf](const 
boost::system::error_code& ec) {
+        batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
             auto self = weakSelf.lock();
             if (self && !ec) {
                 self->doBatchReceiveTimeTask();
diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc
index 53be8ea..794e361 100644
--- a/lib/ExecutorService.cc
+++ b/lib/ExecutorService.cc
@@ -32,7 +32,7 @@ void ExecutorService::start() {
     auto self = shared_from_this();
     std::thread t{[this, self] {
         LOG_DEBUG("Run io_service in a single thread");
-        boost::system::error_code ec;
+        ASIO_ERROR ec;
         while (!closed_) {
             io_service_.restart();
             IOService::work work{getIOService()};
@@ -63,22 +63,22 @@ ExecutorServicePtr ExecutorService::create() {
 }
 
 /*
- *  factory method of boost::asio::ip::tcp::socket associated with io_service_ 
instance
+ *  factory method of ASIO::ip::tcp::socket associated with io_service_ 
instance
  *  @ returns shared_ptr to this socket
  */
 SocketPtr ExecutorService::createSocket() {
     try {
-        return SocketPtr(new boost::asio::ip::tcp::socket(io_service_));
-    } catch (const boost::system::system_error &e) {
+        return SocketPtr(new ASIO::ip::tcp::socket(io_service_));
+    } catch (const ASIO_SYSTEM_ERROR &e) {
         restart();
         auto error = std::string("Failed to create socket: ") + e.what();
         throw std::runtime_error(error);
     }
 }
 
-TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, 
boost::asio::ssl::context &ctx) {
-    return 
std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket &>>(
-        new boost::asio::ssl::stream<boost::asio::ip::tcp::socket &>(*socket, 
ctx));
+TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, 
ASIO::ssl::context &ctx) {
+    return std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket &>>(
+        new ASIO::ssl::stream<ASIO::ip::tcp::socket &>(*socket, ctx));
 }
 
 /*
@@ -87,8 +87,8 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr 
&socket, boost::asio::ss
  */
 TcpResolverPtr ExecutorService::createTcpResolver() {
     try {
-        return TcpResolverPtr(new boost::asio::ip::tcp::resolver(io_service_));
-    } catch (const boost::system::system_error &e) {
+        return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_));
+    } catch (const ASIO_SYSTEM_ERROR &e) {
         restart();
         auto error = std::string("Failed to create resolver: ") + e.what();
         throw std::runtime_error(error);
@@ -97,10 +97,10 @@ TcpResolverPtr ExecutorService::createTcpResolver() {
 
 DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
     try {
-        return DeadlineTimerPtr(new boost::asio::deadline_timer(io_service_));
-    } catch (const boost::system::system_error &e) {
+        return DeadlineTimerPtr(new ASIO::steady_timer(io_service_));
+    } catch (const ASIO_SYSTEM_ERROR &e) {
         restart();
-        auto error = std::string("Failed to create deadline_timer: ") + 
e.what();
+        auto error = std::string("Failed to create steady_timer: ") + e.what();
         throw std::runtime_error(error);
     }
 }
diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h
index a373c0a..89d06d3 100644
--- a/lib/ExecutorService.h
+++ b/lib/ExecutorService.h
@@ -22,10 +22,15 @@
 #include <pulsar/defines.h>
 
 #include <atomic>
-#include <boost/asio/deadline_timer.hpp>
+#ifdef USE_ASIO
+#include <asio/io_service.hpp>
+#include <asio/ip/tcp.hpp>
+#include <asio/ssl.hpp>
+#else
 #include <boost/asio/io_service.hpp>
 #include <boost/asio/ip/tcp.hpp>
 #include <boost/asio/ssl.hpp>
+#endif
 #include <chrono>
 #include <condition_variable>
 #include <functional>
@@ -33,14 +38,15 @@
 #include <mutex>
 #include <thread>
 
+#include "AsioTimer.h"
+
 namespace pulsar {
-typedef std::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
-typedef std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket 
&> > TlsSocketPtr;
-typedef std::shared_ptr<boost::asio::ip::tcp::resolver> TcpResolverPtr;
-typedef std::shared_ptr<boost::asio::deadline_timer> DeadlineTimerPtr;
+typedef std::shared_ptr<ASIO::ip::tcp::socket> SocketPtr;
+typedef std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket &> > 
TlsSocketPtr;
+typedef std::shared_ptr<ASIO::ip::tcp::resolver> TcpResolverPtr;
 class PULSAR_PUBLIC ExecutorService : public 
std::enable_shared_from_this<ExecutorService> {
    public:
-    using IOService = boost::asio::io_service;
+    using IOService = ASIO::io_service;
     using SharedPtr = std::shared_ptr<ExecutorService>;
 
     static SharedPtr create();
@@ -51,7 +57,7 @@ class PULSAR_PUBLIC ExecutorService : public 
std::enable_shared_from_this<Execut
 
     // throws std::runtime_error if failed
     SocketPtr createSocket();
-    static TlsSocketPtr createTlsSocket(SocketPtr &socket, 
boost::asio::ssl::context &ctx);
+    static TlsSocketPtr createTlsSocket(SocketPtr &socket, ASIO::ssl::context 
&ctx);
     // throws std::runtime_error if failed
     TcpResolverPtr createTcpResolver();
     // throws std::runtime_error if failed
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index b55f751..fa21a57 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -18,6 +18,7 @@
  */
 #include "HandlerBase.h"
 
+#include "Backoff.h"
 #include "ClientConnection.h"
 #include "ClientImpl.h"
 #include "ExecutorService.h"
@@ -36,7 +37,7 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const 
std::string& topic,
       executor_(client->getIOExecutorProvider()->get()),
       mutex_(),
       creationTimestamp_(TimeUtils::now()),
-      operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())),
+      
operationTimeut_(std::chrono::seconds(client->conf().getOperationTimeoutSeconds())),
       state_(NotStarted),
       backoff_(backoff),
       epoch_(0),
@@ -147,13 +148,13 @@ void HandlerBase::scheduleReconnection() {
     if (state == Pending || state == Ready) {
         TimeDuration delay = backoff_.next();
 
-        LOG_INFO(getName() << "Schedule reconnection in " << 
(delay.total_milliseconds() / 1000.0) << " s");
+        LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) 
/ 1000.0) << " s");
         timer_->expires_from_now(delay);
         // passing shared_ptr here since time_ will get destroyed, so tasks 
will be cancelled
         // so we will not run into the case where grabCnx is invoked on out of 
scope handler
         auto name = getName();
         std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
-        timer_->async_wait([name, weakSelf](const boost::system::error_code& 
ec) {
+        timer_->async_wait([name, weakSelf](const ASIO_ERROR& ec) {
             auto self = weakSelf.lock();
             if (self) {
                 self->handleTimeout(ec);
@@ -164,7 +165,7 @@ void HandlerBase::scheduleReconnection() {
     }
 }
 
-void HandlerBase::handleTimeout(const boost::system::error_code& ec) {
+void HandlerBase::handleTimeout(const ASIO_ERROR& ec) {
     if (ec) {
         LOG_DEBUG(getName() << "Ignoring timer cancelled event, code[" << ec 
<< "]");
         return;
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index f62c4df..68c0b6a 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -20,20 +20,17 @@
 #define _PULSAR_HANDLER_BASE_HEADER_
 #include <pulsar/Result.h>
 
-#include <boost/asio/deadline_timer.hpp>
 #include <memory>
 #include <mutex>
 #include <string>
 
+#include "AsioTimer.h"
 #include "Backoff.h"
 #include "Future.h"
+#include "TimeUtils.h"
 
 namespace pulsar {
 
-using namespace boost::posix_time;
-using boost::posix_time::milliseconds;
-using boost::posix_time::seconds;
-
 class ClientImpl;
 using ClientImplPtr = std::shared_ptr<ClientImpl>;
 using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
@@ -42,7 +39,6 @@ using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
 using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
 class ExecutorService;
 using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
 
 class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
    public:
@@ -95,7 +91,7 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
 
     void handleDisconnection(Result result, const ClientConnectionPtr& cnx);
 
-    void handleTimeout(const boost::system::error_code& ec);
+    void handleTimeout(const ASIO_ERROR& ec);
 
    protected:
     ClientImplWeakPtr client_;
diff --git a/lib/Int64SerDes.h b/lib/Int64SerDes.h
index dbc5d8a..f1f5eef 100644
--- a/lib/Int64SerDes.h
+++ b/lib/Int64SerDes.h
@@ -20,7 +20,12 @@
 
 #include <stdint.h>
 
-#include <boost/asio.hpp>  // for ntohl
+// for ntohl
+#ifdef USE_ASIO
+#include <asio/detail/socket_ops.hpp>
+#else
+#include <boost/asio/detail/socket_ops.hpp>
+#endif
 
 namespace pulsar {
 
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 15f9d9b..af70623 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -18,6 +18,7 @@
  */
 #include "MultiTopicsConsumerImpl.h"
 
+#include <chrono>
 #include <stdexcept>
 
 #include "ClientImpl.h"
@@ -37,6 +38,9 @@ DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
 
+using std::chrono::milliseconds;
+using std::chrono::seconds;
+
 MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, 
TopicNamePtr topicName,
                                                  int numPartitions, const 
std::string& subscriptionName,
                                                  const ConsumerConfiguration& 
conf,
@@ -90,7 +94,7 @@ 
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
     auto partitionsUpdateInterval = static_cast<unsigned 
int>(client->conf().getPartitionsUpdateInterval());
     if (partitionsUpdateInterval > 0) {
         partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
-        partitionsUpdateInterval_ = 
boost::posix_time::seconds(partitionsUpdateInterval);
+        partitionsUpdateInterval_ = seconds(partitionsUpdateInterval);
         lookupServicePtr_ = client->getLookup();
     }
 
@@ -936,7 +940,7 @@ uint64_t 
MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
 void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
     partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
     auto weakSelf = weak_from_this();
-    partitionsUpdateTimer_->async_wait([weakSelf](const 
boost::system::error_code& ec) {
+    partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
         // If two requests call runPartitionUpdateTask at the same time, the 
timer will fail, and it
         // cannot continue at this time, and the request needs to be ignored.
         auto self = weakSelf.lock();
@@ -1087,7 +1091,7 @@ void 
MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) {
 
 void MultiTopicsConsumerImpl::cancelTimers() noexcept {
     if (partitionsUpdateTimer_) {
-        boost::system::error_code ec;
+        ASIO_ERROR ec;
         partitionsUpdateTimer_->cancel(ec);
     }
 }
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index d4127f6..c5834ea 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -32,6 +32,7 @@
 #include "LookupDataResult.h"
 #include "SynchronizedHashMap.h"
 #include "TestUtil.h"
+#include "TimeUtils.h"
 #include "UnboundedBlockingQueue.h"
 
 namespace pulsar {
@@ -119,7 +120,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     std::atomic_int incomingMessagesSize_ = {0};
     MessageListener messageListener_;
     DeadlineTimerPtr partitionsUpdateTimer_;
-    boost::posix_time::time_duration partitionsUpdateInterval_;
+    TimeDuration partitionsUpdateInterval_;
     LookupServicePtr lookupServicePtr_;
     std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
     std::atomic<Result> failedResult{ResultOk};
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index 0dd7358..e443496 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -40,9 +40,9 @@ NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr 
client, ConsumerImpl &con
 
     nackDelay_ =
         
std::chrono::milliseconds(std::max(conf.getNegativeAckRedeliveryDelayMs(), 
MIN_NACK_DELAY_MILLIS));
-    timerInterval_ = boost::posix_time::milliseconds((long)(nackDelay_.count() 
/ 3));
-    LOG_DEBUG("Created negative ack tracker with delay: " << nackDelay_.count()
-                                                          << " ms - Timer 
interval: " << timerInterval_);
+    timerInterval_ = std::chrono::milliseconds((long)(nackDelay_.count() / 3));
+    LOG_DEBUG("Created negative ack tracker with delay: " << 
nackDelay_.count() << " ms - Timer interval: "
+                                                          << 
timerInterval_.count());
 }
 
 void NegativeAcksTracker::scheduleTimer() {
@@ -51,14 +51,14 @@ void NegativeAcksTracker::scheduleTimer() {
     }
     std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()};
     timer_->expires_from_now(timerInterval_);
-    timer_->async_wait([weakSelf](const boost::system::error_code &ec) {
+    timer_->async_wait([weakSelf](const ASIO_ERROR &ec) {
         if (auto self = weakSelf.lock()) {
             self->handleTimer(ec);
         }
     });
 }
 
-void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) {
+void NegativeAcksTracker::handleTimer(const ASIO_ERROR &ec) {
     if (ec) {
         // Ignore cancelled events
         return;
@@ -107,7 +107,7 @@ void NegativeAcksTracker::add(const MessageId &m) {
 
 void NegativeAcksTracker::close() {
     closed_ = true;
-    boost::system::error_code ec;
+    ASIO_ERROR ec;
     timer_->cancel(ec);
     std::lock_guard<std::mutex> lock(mutex_);
     nackedMessages_.clear();
diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h
index 4b48984..472e976 100644
--- a/lib/NegativeAcksTracker.h
+++ b/lib/NegativeAcksTracker.h
@@ -23,12 +23,13 @@
 #include <pulsar/MessageId.h>
 
 #include <atomic>
-#include <boost/asio/deadline_timer.hpp>
 #include <chrono>
 #include <map>
 #include <memory>
 #include <mutex>
 
+#include "AsioDefines.h"
+#include "AsioTimer.h"
 #include "TestUtil.h"
 
 namespace pulsar {
@@ -36,7 +37,6 @@ namespace pulsar {
 class ConsumerImpl;
 class ClientImpl;
 using ClientImplPtr = std::shared_ptr<ClientImpl>;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
 class ExecutorService;
 using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
 
@@ -56,13 +56,13 @@ class NegativeAcksTracker : public 
std::enable_shared_from_this<NegativeAcksTrac
 
    private:
     void scheduleTimer();
-    void handleTimer(const boost::system::error_code &ec);
+    void handleTimer(const ASIO_ERROR &ec);
 
     ConsumerImpl &consumer_;
     std::mutex mutex_;
 
     std::chrono::milliseconds nackDelay_;
-    boost::posix_time::milliseconds timerInterval_;
+    std::chrono::milliseconds timerInterval_;
     typedef typename std::chrono::steady_clock Clock;
     std::map<MessageId, Clock::time_point> nackedMessages_;
 
diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h
index a1319e1..46fd9c1 100644
--- a/lib/OpSendMsg.h
+++ b/lib/OpSendMsg.h
@@ -23,8 +23,6 @@
 #include <pulsar/Producer.h>
 #include <pulsar/Result.h>
 
-#include <boost/date_time/posix_time/ptime.hpp>
-
 #include "ChunkMessageIdImpl.h"
 #include "PulsarApi.pb.h"
 #include "SharedBuffer.h"
@@ -53,7 +51,7 @@ struct OpSendMsg {
     const int32_t numChunks;
     const uint32_t messagesCount;
     const uint64_t messagesSize;
-    const boost::posix_time::ptime timeout;
+    const ptime timeout;
     const SendCallback sendCallback;
     std::vector<std::function<void(Result)>> trackerCallbacks;
     ChunkMessageIdListPtr chunkMessageIdList;
@@ -98,7 +96,7 @@ struct OpSendMsg {
           numChunks(metadata.num_chunks_from_msg()),
           messagesCount(messagesCount),
           messagesSize(messagesSize),
-          timeout(TimeUtils::now() + 
boost::posix_time::milliseconds(sendTimeoutMs)),
+          timeout(TimeUtils::now() + std::chrono::milliseconds(sendTimeoutMs)),
           sendCallback(std::move(callback)),
           chunkMessageIdList(std::move(chunkMessageIdList)),
           sendArgs(new SendArguments(producerId, metadata.sequence_id(), 
metadata, payload)) {}
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 54e96c8..4178096 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -58,7 +58,7 @@ 
PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top
     if (partitionsUpdateInterval > 0) {
         listenerExecutor_ = client->getListenerExecutorProvider()->get();
         partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
-        partitionsUpdateInterval_ = 
boost::posix_time::seconds(partitionsUpdateInterval);
+        partitionsUpdateInterval_ = 
std::chrono::seconds(partitionsUpdateInterval);
         lookupServicePtr_ = client->getLookup();
     }
 }
@@ -69,7 +69,7 @@ MessageRoutingPolicyPtr 
PartitionedProducerImpl::getMessageRouter() {
             return std::make_shared<RoundRobinMessageRouter>(
                 conf_.getHashingScheme(), conf_.getBatchingEnabled(), 
conf_.getBatchingMaxMessages(),
                 conf_.getBatchingMaxAllowedSizeInBytes(),
-                
boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
+                
std::chrono::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
         case ProducerConfiguration::CustomPartition:
             return conf_.getMessageRouterPtr();
         case ProducerConfiguration::UseSinglePartition:
@@ -422,7 +422,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback 
callback) {
 void PartitionedProducerImpl::runPartitionUpdateTask() {
     auto weakSelf = weak_from_this();
     partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
-    partitionsUpdateTimer_->async_wait([weakSelf](const 
boost::system::error_code& ec) {
+    partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
         auto self = weakSelf.lock();
         if (self) {
             self->getPartitionMetadata();
@@ -524,7 +524,7 @@ uint64_t 
PartitionedProducerImpl::getNumberOfConnectedProducer() {
 
 void PartitionedProducerImpl::cancelTimers() noexcept {
     if (partitionsUpdateTimer_) {
-        boost::system::error_code ec;
+        ASIO_ERROR ec;
         partitionsUpdateTimer_->cancel(ec);
     }
 }
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index 2d07a81..610c74e 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -20,21 +20,21 @@
 #include <pulsar/TopicMetadata.h>
 
 #include <atomic>
-#include <boost/asio/deadline_timer.hpp>
 #include <memory>
 #include <mutex>
 #include <vector>
 
+#include "AsioTimer.h"
 #include "LookupDataResult.h"
 #include "ProducerImplBase.h"
 #include "ProducerInterceptors.h"
+#include "TimeUtils.h"
 
 namespace pulsar {
 
 class ClientImpl;
 using ClientImplPtr = std::shared_ptr<ClientImpl>;
 using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
 class ExecutorService;
 using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
 class LookupService;
@@ -128,7 +128,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     ExecutorServicePtr listenerExecutor_;
     DeadlineTimerPtr partitionsUpdateTimer_;
-    boost::posix_time::time_duration partitionsUpdateInterval_;
+    TimeDuration partitionsUpdateInterval_;
     LookupServicePtr lookupServicePtr_;
 
     ProducerInterceptorsPtr interceptors_;
diff --git a/lib/PatternMultiTopicsConsumerImpl.cc 
b/lib/PatternMultiTopicsConsumerImpl.cc
index 23e445e..4fc7bb6 100644
--- a/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/lib/PatternMultiTopicsConsumerImpl.cc
@@ -27,6 +27,8 @@ DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
 
+using std::chrono::seconds;
+
 PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(
     ClientImplPtr client, const std::string pattern, 
CommandGetTopicsOfNamespace_Mode getTopicsMode,
     const std::vector<std::string>& topics, const std::string& 
subscriptionName,
@@ -49,15 +51,15 @@ void 
PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() {
     
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
 
     auto weakSelf = weak_from_this();
-    autoDiscoveryTimer_->async_wait([weakSelf](const 
boost::system::error_code& err) {
+    autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
         if (auto self = weakSelf.lock()) {
             self->autoDiscoveryTimerTask(err);
         }
     });
 }
 
-void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const 
boost::system::error_code& err) {
-    if (err == boost::asio::error::operation_aborted) {
+void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const ASIO_ERROR& 
err) {
+    if (err == ASIO::error::operation_aborted) {
         LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
         return;
     } else if (err) {
@@ -228,7 +230,7 @@ void PatternMultiTopicsConsumerImpl::start() {
     if (conf_.getPatternAutoDiscoveryPeriod() > 0) {
         
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
         auto weakSelf = weak_from_this();
-        autoDiscoveryTimer_->async_wait([weakSelf](const 
boost::system::error_code& err) {
+        autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
             if (auto self = weakSelf.lock()) {
                 self->autoDiscoveryTimerTask(err);
             }
@@ -247,6 +249,6 @@ void 
PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
 }
 
 void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept {
-    boost::system::error_code ec;
+    ASIO_ERROR ec;
     autoDiscoveryTimer_->cancel(ec);
 }
diff --git a/lib/PatternMultiTopicsConsumerImpl.h 
b/lib/PatternMultiTopicsConsumerImpl.h
index 5d3ba9e..f272df2 100644
--- a/lib/PatternMultiTopicsConsumerImpl.h
+++ b/lib/PatternMultiTopicsConsumerImpl.h
@@ -22,6 +22,7 @@
 #include <string>
 #include <vector>
 
+#include "AsioTimer.h"
 #include "LookupDataResult.h"
 #include "MultiTopicsConsumerImpl.h"
 #include "NamespaceName.h"
@@ -56,7 +57,7 @@ class PatternMultiTopicsConsumerImpl : public 
MultiTopicsConsumerImpl {
 
     const PULSAR_REGEX_NAMESPACE::regex getPattern();
 
-    void autoDiscoveryTimerTask(const boost::system::error_code& err);
+    void autoDiscoveryTimerTask(const ASIO_ERROR& err);
 
     // filter input `topics` with given `pattern`, return matched topics. Do 
not match topic domain.
     static NamespaceTopicsPtr topicsPatternFilter(const 
std::vector<std::string>& topics,
@@ -74,7 +75,7 @@ class PatternMultiTopicsConsumerImpl : public 
MultiTopicsConsumerImpl {
     const std::string patternString_;
     const PULSAR_REGEX_NAMESPACE::regex pattern_;
     const CommandGetTopicsOfNamespace_Mode getTopicsMode_;
-    typedef std::shared_ptr<boost::asio::deadline_timer> TimerPtr;
+    typedef std::shared_ptr<ASIO::steady_timer> TimerPtr;
     TimerPtr autoDiscoveryTimer_;
     bool autoDiscoveryRunning_;
     NamespaceNamePtr namespaceName_;
diff --git a/lib/PeriodicTask.cc b/lib/PeriodicTask.cc
index 6046eae..9fde012 100644
--- a/lib/PeriodicTask.cc
+++ b/lib/PeriodicTask.cc
@@ -18,6 +18,8 @@
  */
 #include "PeriodicTask.h"
 
+#include <chrono>
+
 namespace pulsar {
 
 void PeriodicTask::start() {
@@ -27,7 +29,7 @@ void PeriodicTask::start() {
     state_ = Ready;
     if (periodMs_ >= 0) {
         std::weak_ptr<PeriodicTask> weakSelf{shared_from_this()};
-        timer_->expires_from_now(boost::posix_time::millisec(periodMs_));
+        timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
         timer_->async_wait([weakSelf](const ErrorCode& ec) {
             auto self = weakSelf.lock();
             if (self) {
@@ -48,7 +50,7 @@ void PeriodicTask::stop() noexcept {
 }
 
 void PeriodicTask::handleTimeout(const ErrorCode& ec) {
-    if (state_ != Ready || ec.value() == 
boost::system::errc::operation_canceled) {
+    if (state_ != Ready || ec == ASIO::error::operation_aborted) {
         return;
     }
 
@@ -57,7 +59,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) {
     // state_ may be changed in handleTimeout, so we check state_ again
     if (state_ == Ready) {
         auto self = shared_from_this();
-        timer_->expires_from_now(boost::posix_time::millisec(periodMs_));
+        timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
         timer_->async_wait([this, self](const ErrorCode& ec) { 
handleTimeout(ec); });
     }
 }
diff --git a/lib/PeriodicTask.h b/lib/PeriodicTask.h
index 5de81ae..bc18634 100644
--- a/lib/PeriodicTask.h
+++ b/lib/PeriodicTask.h
@@ -36,7 +36,7 @@ namespace pulsar {
  */
 class PeriodicTask : public std::enable_shared_from_this<PeriodicTask> {
    public:
-    using ErrorCode = boost::system::error_code;
+    using ErrorCode = ASIO_ERROR;
     using CallbackType = std::function<void(const ErrorCode&)>;
 
     enum State : std::uint8_t
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 0a12925..fc39b23 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -20,7 +20,7 @@
 
 #include <pulsar/MessageIdBuilder.h>
 
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include <chrono>
 
 #include "BatchMessageContainer.h"
 #include "BatchMessageKeyBasedContainer.h"
@@ -46,6 +46,8 @@
 namespace pulsar {
 DECLARE_LOG_OBJECT()
 
+using std::chrono::milliseconds;
+
 ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
                            const ProducerConfiguration& conf, const 
ProducerInterceptorsPtr& interceptors,
                            int32_t partition, bool retryOnCreationError)
@@ -465,7 +467,7 @@ void ProducerImpl::sendAsync(const Message& msg, 
SendCallback callback) {
     Producer producer = Producer(shared_from_this());
     auto interceptorMessage = interceptors_->beforeSend(producer, msg);
 
-    const auto now = boost::posix_time::microsec_clock::universal_time();
+    const auto now = TimeUtils::now();
     auto self = shared_from_this();
     sendAsyncWithStatsUpdate(interceptorMessage, [this, self, now, callback, 
producer, interceptorMessage](
                                                      Result result, const 
MessageId& messageId) {
@@ -564,10 +566,9 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& 
msg, SendCallback&& c
         bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
         bool isFull = batchMessageContainer_->add(msg, callback);
         if (isFirstMessage) {
-            batchTimer_->expires_from_now(
-                
boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
+            
batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
             auto weakSelf = weak_from_this();
-            batchTimer_->async_wait([this, weakSelf](const 
boost::system::error_code& ec) {
+            batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
                 auto self = weakSelf.lock();
                 if (!self) {
                     return;
@@ -824,14 +825,14 @@ Future<Result, ProducerImplBaseWeakPtr> 
ProducerImpl::getProducerCreatedFuture()
 
 uint64_t ProducerImpl::getProducerId() const { return producerId_; }
 
-void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
+void ProducerImpl::handleSendTimeout(const ASIO_ERROR& err) {
     const auto state = state_.load();
     if (state != Pending && state != Ready) {
         return;
     }
     Lock lock(mutex_);
 
-    if (err == boost::asio::error::operation_aborted) {
+    if (err == ASIO::error::operation_aborted) {
         LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
         return;
     } else if (err) {
@@ -847,8 +848,8 @@ void ProducerImpl::handleSendTimeout(const 
boost::system::error_code& err) {
     } else {
         // If there is at least one message, calculate the diff between the 
message timeout and
         // the current time.
-        time_duration diff = pendingMessagesQueue_.front()->timeout - 
TimeUtils::now();
-        if (diff.total_milliseconds() <= 0) {
+        auto diff = pendingMessagesQueue_.front()->timeout - TimeUtils::now();
+        if (toMillis(diff) <= 0) {
             // The diff is less than or equal to zero, meaning that the 
message has been expired.
             LOG_DEBUG(getName() << "Timer expired. Calling timeout 
callbacks.");
             pendingMessages = getPendingCallbacksWhenFailed();
@@ -856,7 +857,7 @@ void ProducerImpl::handleSendTimeout(const 
boost::system::error_code& err) {
             asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
         } else {
             // The diff is greater than zero, set the timeout to the diff value
-            LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new 
timeout " << diff);
+            LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new 
timeout " << diff.count());
             asyncWaitSendTimeout(diff);
         }
     }
@@ -1000,7 +1001,7 @@ void ProducerImpl::shutdown() {
 
 void ProducerImpl::cancelTimers() noexcept {
     dataKeyRefreshTask_.stop();
-    boost::system::error_code ec;
+    ASIO_ERROR ec;
     batchTimer_->cancel(ec);
     sendTimer_->cancel(ec);
 }
@@ -1026,7 +1027,7 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType 
expiryTime) {
     sendTimer_->expires_from_now(expiryTime);
 
     auto weakSelf = weak_from_this();
-    sendTimer_->async_wait([weakSelf](const boost::system::error_code& err) {
+    sendTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
         auto self = weakSelf.lock();
         if (self) {
             
std::static_pointer_cast<ProducerImpl>(self)->handleSendTimeout(err);
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index b467458..9781605 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -19,6 +19,12 @@
 #ifndef LIB_PRODUCERIMPL_H_
 #define LIB_PRODUCERIMPL_H_
 
+#include "TimeUtils.h"
+#ifdef USE_ASIO
+#include <asio/steady_timer.hpp>
+#else
+#include <boost/asio/steady_timer.hpp>
+#endif
 #include <atomic>
 #include <boost/optional.hpp>
 #include <list>
@@ -30,6 +36,7 @@
 #if defined(_MSC_VER) || defined(__APPLE__)
 #include "OpSendMsg.h"
 #endif
+#include "AsioDefines.h"
 #include "PendingFailures.h"
 #include "PeriodicTask.h"
 #include "ProducerImplBase.h"
@@ -39,7 +46,7 @@ namespace pulsar {
 class BatchMessageContainerBase;
 class ClientImpl;
 using ClientImplPtr = std::shared_ptr<ClientImpl>;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
+using DeadlineTimerPtr = std::shared_ptr<ASIO::steady_timer>;
 class MessageCrypto;
 using MessageCryptoPtr = std::shared_ptr<MessageCrypto>;
 class ProducerImpl;
@@ -137,7 +144,7 @@ class ProducerImpl : public HandlerBase, public 
ProducerImplBase {
 
     void resendMessages(ClientConnectionPtr cnx);
 
-    void refreshEncryptionKey(const boost::system::error_code& ec);
+    void refreshEncryptionKey(const ASIO_ERROR& ec);
     bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& 
payload,
                         SharedBuffer& encryptedPayload);
 
@@ -183,8 +190,8 @@ class ProducerImpl : public HandlerBase, public 
ProducerImplBase {
     std::string schemaVersion_;
 
     DeadlineTimerPtr sendTimer_;
-    void handleSendTimeout(const boost::system::error_code& err);
-    using DurationType = typename boost::asio::deadline_timer::duration_type;
+    void handleSendTimeout(const ASIO_ERROR& err);
+    using DurationType = TimeDuration;
     void asyncWaitSendTimeout(DurationType expiryTime);
 
     Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h
index d026e42..9c920da 100644
--- a/lib/RetryableOperation.h
+++ b/lib/RetryableOperation.h
@@ -30,6 +30,7 @@
 #include "Future.h"
 #include "LogUtils.h"
 #include "ResultUtils.h"
+#include "TimeUtils.h"
 
 namespace pulsar {
 
@@ -43,9 +44,8 @@ class RetryableOperation : public 
std::enable_shared_from_this<RetryableOperatio
                        DeadlineTimerPtr timer)
         : name_(name),
           func_(std::move(func)),
-          timeout_(boost::posix_time::seconds(timeoutSeconds)),
-          backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_,
-                   boost::posix_time::milliseconds(0)),
+          timeout_(std::chrono::seconds(timeoutSeconds)),
+          backoff_(std::chrono::milliseconds(100), timeout_ + timeout_, 
std::chrono::milliseconds(0)),
           timer_(timer) {}
 
    public:
@@ -67,7 +67,7 @@ class RetryableOperation : public 
std::enable_shared_from_this<RetryableOperatio
 
     void cancel() {
         promise_.setFailed(ResultDisconnected);
-        boost::system::error_code ec;
+        ASIO_ERROR ec;
         timer_->cancel(ec);
     }
 
@@ -100,7 +100,7 @@ class RetryableOperation : public 
std::enable_shared_from_this<RetryableOperatio
                 promise_.setFailed(result);
                 return;
             }
-            if (remainingTime.total_milliseconds() <= 0) {
+            if (toMillis(remainingTime) <= 0) {
                 promise_.setFailed(ResultTimeout);
                 return;
             }
@@ -109,24 +109,23 @@ class RetryableOperation : public 
std::enable_shared_from_this<RetryableOperatio
             timer_->expires_from_now(delay);
 
             auto nextRemainingTime = remainingTime - delay;
-            LOG_INFO("Reschedule " << name_ << " for " << 
delay.total_milliseconds()
-                                   << " ms, remaining time: " << 
nextRemainingTime.total_milliseconds()
-                                   << " ms");
-            timer_->async_wait([this, weakSelf, nextRemainingTime](const 
boost::system::error_code& ec) {
+            LOG_INFO("Reschedule " << name_ << " for " << toMillis(delay)
+                                   << " ms, remaining time: " << 
toMillis(nextRemainingTime) << " ms");
+            timer_->async_wait([this, weakSelf, nextRemainingTime](const 
ASIO_ERROR& ec) {
                 auto self = weakSelf.lock();
                 if (!self) {
                     return;
                 }
                 if (ec) {
-                    if (ec == boost::asio::error::operation_aborted) {
+                    if (ec == ASIO::error::operation_aborted) {
                         LOG_DEBUG("Timer for " << name_ << " is cancelled");
                         promise_.setFailed(ResultTimeout);
                     } else {
                         LOG_WARN("Timer for " << name_ << " failed: " << 
ec.message());
                     }
                 } else {
-                    LOG_DEBUG("Run operation " << name_ << ", remaining time: "
-                                               << 
nextRemainingTime.total_milliseconds() << " ms");
+                    LOG_DEBUG("Run operation " << name_ << ", remaining time: 
" << toMillis(nextRemainingTime)
+                                               << " ms");
                     runImpl(nextRemainingTime);
                 }
             });
diff --git a/lib/RoundRobinMessageRouter.cc b/lib/RoundRobinMessageRouter.cc
index 9693cc2..1bc0b30 100644
--- a/lib/RoundRobinMessageRouter.cc
+++ b/lib/RoundRobinMessageRouter.cc
@@ -26,8 +26,7 @@
 namespace pulsar {
 
RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme
 hashingScheme,
                                                  bool batchingEnabled, 
uint32_t maxBatchingMessages,
-                                                 uint32_t maxBatchingSize,
-                                                 
boost::posix_time::time_duration maxBatchingDelay)
+                                                 uint32_t maxBatchingSize, 
TimeDuration maxBatchingDelay)
     : MessageRouterBase(hashingScheme),
       batchingEnabled_(batchingEnabled),
       maxBatchingMessages_(maxBatchingMessages),
@@ -74,7 +73,7 @@ int RoundRobinMessageRouter::getPartition(const Message& msg, 
const TopicMetadat
     int64_t now = TimeUtils::currentTimeMillis();
 
     if (messageCount >= maxBatchingMessages_ || (messageSize >= 
maxBatchingSize_ - batchSize) ||
-        (now - lastPartitionChange >= maxBatchingDelay_.total_milliseconds())) 
{
+        (now - lastPartitionChange >= toMillis(maxBatchingDelay_))) {
         uint32_t currentPartitionCursor = ++currentPartitionCursor_;
         lastPartitionChange_ = now;
         cumulativeBatchSize_ = messageSize;
diff --git a/lib/RoundRobinMessageRouter.h b/lib/RoundRobinMessageRouter.h
index 753573a..03cfac8 100644
--- a/lib/RoundRobinMessageRouter.h
+++ b/lib/RoundRobinMessageRouter.h
@@ -23,16 +23,16 @@
 #include <pulsar/TopicMetadata.h>
 
 #include <atomic>
-#include <boost/date_time/posix_time/posix_time.hpp>
 
 #include "MessageRouterBase.h"
+#include "TimeUtils.h"
 
 namespace pulsar {
 class PULSAR_PUBLIC RoundRobinMessageRouter : public MessageRouterBase {
    public:
     RoundRobinMessageRouter(ProducerConfiguration::HashingScheme 
hashingScheme, bool batchingEnabled,
                             uint32_t maxBatchingMessages, uint32_t 
maxBatchingSize,
-                            boost::posix_time::time_duration maxBatchingDelay);
+                            TimeDuration maxBatchingDelay);
     virtual ~RoundRobinMessageRouter();
     virtual int getPartition(const Message& msg, const TopicMetadata& 
topicMetadata);
 
@@ -40,7 +40,7 @@ class PULSAR_PUBLIC RoundRobinMessageRouter : public 
MessageRouterBase {
     const bool batchingEnabled_;
     const uint32_t maxBatchingMessages_;
     const uint32_t maxBatchingSize_;
-    const boost::posix_time::time_duration maxBatchingDelay_;
+    const TimeDuration maxBatchingDelay_;
 
     std::atomic<uint32_t> currentPartitionCursor_;
     std::atomic<int64_t> lastPartitionChange_;
diff --git a/lib/SharedBuffer.h b/lib/SharedBuffer.h
index 7ee2618..26fc59e 100644
--- a/lib/SharedBuffer.h
+++ b/lib/SharedBuffer.h
@@ -22,12 +22,19 @@
 #include <assert.h>
 
 #include <array>
+#ifdef USE_ASIO
+#include <asio/buffer.hpp>
+#include <asio/detail/socket_ops.hpp>
+#else
 #include <boost/asio/buffer.hpp>
 #include <boost/asio/detail/socket_ops.hpp>
+#endif
 #include <memory>
 #include <string>
 #include <utility>
 
+#include "AsioDefines.h"
+
 namespace pulsar {
 
 class SharedBuffer {
@@ -144,13 +151,13 @@ class SharedBuffer {
 
     inline bool writable() const { return writableBytes() > 0; }
 
-    boost::asio::const_buffers_1 const_asio_buffer() const {
-        return boost::asio::const_buffers_1(ptr_ + readIdx_, readableBytes());
+    ASIO::const_buffers_1 const_asio_buffer() const {
+        return ASIO::const_buffers_1(ptr_ + readIdx_, readableBytes());
     }
 
-    boost::asio::mutable_buffers_1 asio_buffer() {
+    ASIO::mutable_buffers_1 asio_buffer() {
         assert(data_);
-        return boost::asio::buffer(ptr_ + writeIdx_, writableBytes());
+        return ASIO::buffer(ptr_ + writeIdx_, writableBytes());
     }
 
     void write(const char* data, uint32_t size) {
@@ -239,17 +246,17 @@ class CompositeSharedBuffer {
     }
 
     // Implement the ConstBufferSequence requirements.
-    typedef boost::asio::const_buffer value_type;
-    typedef boost::asio::const_buffer* iterator;
-    typedef const boost::asio::const_buffer* const_iterator;
+    typedef ASIO::const_buffer value_type;
+    typedef ASIO::const_buffer* iterator;
+    typedef const ASIO::const_buffer* const_iterator;
 
-    const boost::asio::const_buffer* begin() const { return 
&(asioBuffers_.at(0)); }
+    const ASIO::const_buffer* begin() const { return &(asioBuffers_.at(0)); }
 
-    const boost::asio::const_buffer* end() const { return begin() + Size; }
+    const ASIO::const_buffer* end() const { return begin() + Size; }
 
    private:
     std::array<SharedBuffer, Size> sharedBuffers_;
-    std::array<boost::asio::const_buffer, Size> asioBuffers_;
+    std::array<ASIO::const_buffer, Size> asioBuffers_;
 };
 
 typedef CompositeSharedBuffer<2> PairSharedBuffer;
diff --git a/lib/TimeUtils.h b/lib/TimeUtils.h
index a55773d..03f563c 100644
--- a/lib/TimeUtils.h
+++ b/lib/TimeUtils.h
@@ -21,19 +21,23 @@
 #include <pulsar/defines.h>
 
 #include <atomic>
-#include <boost/date_time/posix_time/posix_time.hpp>
 #include <chrono>
 
 namespace pulsar {
 
-using namespace boost::posix_time;
-using boost::posix_time::milliseconds;
-using boost::posix_time::seconds;
+using ptime = decltype(std::chrono::high_resolution_clock::now());
+using TimeDuration = std::chrono::nanoseconds;
+
+inline decltype(std::chrono::milliseconds(0).count()) toMillis(TimeDuration 
duration) {
+    return 
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
+}
 
 class PULSAR_PUBLIC TimeUtils {
    public:
-    static ptime now();
-    static int64_t currentTimeMillis();
+    static ptime now() { return std::chrono::high_resolution_clock::now(); }
+    static int64_t currentTimeMillis() {
+        return 
std::chrono::duration_cast<std::chrono::milliseconds>(now().time_since_epoch()).count();
+    }
 };
 
 // This class processes a timeout with the following semantics:
diff --git a/lib/UnAckedMessageTrackerEnabled.cc 
b/lib/UnAckedMessageTrackerEnabled.cc
index 061a140..e371af9 100644
--- a/lib/UnAckedMessageTrackerEnabled.cc
+++ b/lib/UnAckedMessageTrackerEnabled.cc
@@ -34,9 +34,9 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
     timeoutHandlerHelper();
     ExecutorServicePtr executorService = 
client_->getIOExecutorProvider()->get();
     timer_ = executorService->createDeadlineTimer();
-    
timer_->expires_from_now(boost::posix_time::milliseconds(tickDurationInMs_));
+    timer_->expires_from_now(std::chrono::milliseconds(tickDurationInMs_));
     std::weak_ptr<UnAckedMessageTrackerEnabled> weakSelf{shared_from_this()};
-    timer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+    timer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
         auto self = weakSelf.lock();
         if (self && !ec) {
             self->timeoutHandler();
@@ -173,7 +173,7 @@ void UnAckedMessageTrackerEnabled::clear() {
 }
 
 void UnAckedMessageTrackerEnabled::stop() {
-    boost::system::error_code ec;
+    ASIO_ERROR ec;
     if (timer_) {
         timer_->cancel(ec);
     }
diff --git a/lib/UnAckedMessageTrackerEnabled.h 
b/lib/UnAckedMessageTrackerEnabled.h
index 6181a8a..83edc4c 100644
--- a/lib/UnAckedMessageTrackerEnabled.h
+++ b/lib/UnAckedMessageTrackerEnabled.h
@@ -18,13 +18,13 @@
  */
 #ifndef LIB_UNACKEDMESSAGETRACKERENABLED_H_
 #define LIB_UNACKEDMESSAGETRACKERENABLED_H_
-#include <boost/asio/deadline_timer.hpp>
 #include <deque>
 #include <map>
 #include <memory>
 #include <mutex>
 #include <set>
 
+#include "AsioTimer.h"
 #include "TestUtil.h"
 #include "UnAckedMessageTrackerInterface.h"
 
@@ -33,7 +33,6 @@ namespace pulsar {
 class ClientImpl;
 class ConsumerImplBase;
 using ClientImplPtr = std::shared_ptr<ClientImpl>;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
 
 class UnAckedMessageTrackerEnabled : public 
std::enable_shared_from_this<UnAckedMessageTrackerEnabled>,
                                      public UnAckedMessageTrackerInterface {
diff --git a/lib/auth/athenz/ZTSClient.cc b/lib/auth/athenz/ZTSClient.cc
index 230713e..35387d9 100644
--- a/lib/auth/athenz/ZTSClient.cc
+++ b/lib/auth/athenz/ZTSClient.cc
@@ -44,8 +44,6 @@ namespace ptree = boost::property_tree;
 #pragma clang diagnostic ignored "-Wunknown-warning-option"
 #endif
 
-#include <boost/xpressive/xpressive.hpp>
-
 #if defined(__clang__)
 #pragma clang diagnostic pop
 #endif
diff --git a/lib/stats/ConsumerStatsImpl.cc b/lib/stats/ConsumerStatsImpl.cc
index 056dbf6..0eefabd 100644
--- a/lib/stats/ConsumerStatsImpl.cc
+++ b/lib/stats/ConsumerStatsImpl.cc
@@ -46,7 +46,7 @@ ConsumerStatsImpl::ConsumerStatsImpl(const ConsumerStatsImpl& 
stats)
       totalAckedMsgMap_(stats.totalAckedMsgMap_),
       statsIntervalInSeconds_(stats.statsIntervalInSeconds_) {}
 
-void ConsumerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
+void ConsumerStatsImpl::flushAndReset(const ASIO_ERROR& ec) {
     if (ec) {
         LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
         return;
@@ -85,9 +85,9 @@ void ConsumerStatsImpl::messageAcknowledged(Result res, 
CommandAck_AckType ackTy
 }
 
 void ConsumerStatsImpl::scheduleTimer() {
-    
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
+    timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_));
     std::weak_ptr<ConsumerStatsImpl> weakSelf{shared_from_this()};
-    timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) {
+    timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
         auto self = weakSelf.lock();
         if (!self) {
             return;
diff --git a/lib/stats/ConsumerStatsImpl.h b/lib/stats/ConsumerStatsImpl.h
index 44f927f..03f3a47 100644
--- a/lib/stats/ConsumerStatsImpl.h
+++ b/lib/stats/ConsumerStatsImpl.h
@@ -20,17 +20,16 @@
 #ifndef PULSAR_CONSUMER_STATS_IMPL_H_
 #define PULSAR_CONSUMER_STATS_IMPL_H_
 
-#include <boost/asio/deadline_timer.hpp>
 #include <map>
 #include <memory>
 #include <mutex>
 #include <utility>
 
 #include "ConsumerStatsBase.h"
+#include "lib/AsioTimer.h"
 #include "lib/ExecutorService.h"
 namespace pulsar {
 
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
 class ExecutorService;
 using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
 
@@ -58,7 +57,7 @@ class ConsumerStatsImpl : public 
std::enable_shared_from_this<ConsumerStatsImpl>
    public:
     ConsumerStatsImpl(std::string, ExecutorServicePtr, unsigned int);
     ConsumerStatsImpl(const ConsumerStatsImpl& stats);
-    void flushAndReset(const boost::system::error_code&);
+    void flushAndReset(const ASIO_ERROR&);
     void start() override;
     void receivedMessage(Message&, Result) override;
     void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) 
override;
diff --git a/lib/stats/ProducerStatsBase.h b/lib/stats/ProducerStatsBase.h
index fe0ba0a..b24266e 100644
--- a/lib/stats/ProducerStatsBase.h
+++ b/lib/stats/ProducerStatsBase.h
@@ -22,14 +22,14 @@
 #include <pulsar/Message.h>
 #include <pulsar/Result.h>
 
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include "lib/TimeUtils.h"
 
 namespace pulsar {
 class ProducerStatsBase {
    public:
     virtual void start() {}
     virtual void messageSent(const Message& msg) = 0;
-    virtual void messageReceived(Result, const boost::posix_time::ptime&) = 0;
+    virtual void messageReceived(Result, const ptime&) = 0;
     virtual ~ProducerStatsBase(){};
 };
 
diff --git a/lib/stats/ProducerStatsDisabled.h 
b/lib/stats/ProducerStatsDisabled.h
index df1df0f..df1da78 100644
--- a/lib/stats/ProducerStatsDisabled.h
+++ b/lib/stats/ProducerStatsDisabled.h
@@ -25,7 +25,7 @@ namespace pulsar {
 class ProducerStatsDisabled : public ProducerStatsBase {
    public:
     virtual void messageSent(const Message& msg){};
-    virtual void messageReceived(Result, const boost::posix_time::ptime&){};
+    virtual void messageReceived(Result, const ptime&){};
 };
 }  // namespace pulsar
 #endif  // PULSAR_PRODUCER_STATS_DISABLED_HEADER
diff --git a/lib/stats/ProducerStatsImpl.cc b/lib/stats/ProducerStatsImpl.cc
index 3d3629d..15e9e67 100644
--- a/lib/stats/ProducerStatsImpl.cc
+++ b/lib/stats/ProducerStatsImpl.cc
@@ -20,9 +20,11 @@
 #include "ProducerStatsImpl.h"
 
 #include <array>
+#include <chrono>
 
 #include "lib/ExecutorService.h"
 #include "lib/LogUtils.h"
+#include "lib/TimeUtils.h"
 #include "lib/Utils.h"
 
 namespace pulsar {
@@ -65,7 +67,7 @@ ProducerStatsImpl::ProducerStatsImpl(const ProducerStatsImpl& 
stats)
 
 void ProducerStatsImpl::start() { scheduleTimer(); }
 
-void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
+void ProducerStatsImpl::flushAndReset(const ASIO_ERROR& ec) {
     if (ec) {
         LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
         return;
@@ -93,9 +95,10 @@ void ProducerStatsImpl::messageSent(const Message& msg) {
     totalBytesSent_ += msg.getLength();
 }
 
-void ProducerStatsImpl::messageReceived(Result res, const 
boost::posix_time::ptime& publishTime) {
-    boost::posix_time::ptime currentTime = 
boost::posix_time::microsec_clock::universal_time();
-    double diffInMicros = (currentTime - publishTime).total_microseconds();
+void ProducerStatsImpl::messageReceived(Result res, const ptime& publishTime) {
+    auto currentTime = TimeUtils::now();
+    double diffInMicros =
+        std::chrono::duration_cast<std::chrono::microseconds>(currentTime - 
publishTime).count();
     std::lock_guard<std::mutex> lock(mutex_);
     totalLatencyAccumulator_(diffInMicros);
     latencyAccumulator_(diffInMicros);
@@ -106,9 +109,9 @@ void ProducerStatsImpl::messageReceived(Result res, const 
boost::posix_time::pti
 ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); }
 
 void ProducerStatsImpl::scheduleTimer() {
-    
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
+    timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_));
     std::weak_ptr<ProducerStatsImpl> weakSelf{shared_from_this()};
-    timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) {
+    timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
         auto self = weakSelf.lock();
         if (!self) {
             return;
diff --git a/lib/stats/ProducerStatsImpl.h b/lib/stats/ProducerStatsImpl.h
index 8cd1099..5d445c6 100644
--- a/lib/stats/ProducerStatsImpl.h
+++ b/lib/stats/ProducerStatsImpl.h
@@ -30,20 +30,18 @@
 #include <boost/accumulators/framework/features.hpp>
 #include <boost/accumulators/statistics.hpp>
 #include <boost/accumulators/statistics/extended_p_square.hpp>
-#include <boost/asio/deadline_timer.hpp>
-#include <boost/date_time/local_time/local_time.hpp>
 #include <iostream>
 #include <memory>
 #include <mutex>
 #include <vector>
 
 #include "ProducerStatsBase.h"
+#include "lib/AsioTimer.h"
 
 namespace pulsar {
 
 class ExecutorService;
 using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
-using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
 
 typedef boost::accumulators::accumulator_set<
     double,
@@ -83,11 +81,11 @@ class ProducerStatsImpl : public 
std::enable_shared_from_this<ProducerStatsImpl>
 
     void start() override;
 
-    void flushAndReset(const boost::system::error_code&);
+    void flushAndReset(const ASIO_ERROR&);
 
     void messageSent(const Message&) override;
 
-    void messageReceived(Result, const boost::posix_time::ptime&) override;
+    void messageReceived(Result, const ptime&) override;
 
     ~ProducerStatsImpl();
 
diff --git a/tests/AuthPluginTest.cc b/tests/AuthPluginTest.cc
index 9fd048b..b091f97 100644
--- a/tests/AuthPluginTest.cc
+++ b/tests/AuthPluginTest.cc
@@ -21,9 +21,14 @@
 #include <pulsar/Client.h>
 
 #include <boost/algorithm/string.hpp>
+#ifdef USE_ASIO
+#include <asio.hpp>
+#else
 #include <boost/asio.hpp>
+#endif
 #include <thread>
 
+#include "lib/AsioDefines.h"
 #include "lib/Future.h"
 #include "lib/Latch.h"
 #include "lib/LogUtils.h"
@@ -287,10 +292,9 @@ namespace testAthenz {
 std::string principalToken;
 void mockZTS(Latch& latch, int port) {
     LOG_INFO("-- MockZTS started");
-    boost::asio::io_service io;
-    boost::asio::ip::tcp::iostream stream;
-    boost::asio::ip::tcp::acceptor acceptor(io,
-                                            
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port));
+    ASIO::io_service io;
+    ASIO::ip::tcp::iostream stream;
+    ASIO::ip::tcp::acceptor acceptor(io, 
ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), port));
 
     LOG_INFO("-- MockZTS waiting for connnection");
     latch.countdown();
diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc
index a04da08..7595f44 100644
--- a/tests/AuthTokenTest.cc
+++ b/tests/AuthTokenTest.cc
@@ -22,7 +22,6 @@
 #include <pulsar/Client.h>
 
 #include <boost/algorithm/string.hpp>
-#include <boost/asio.hpp>
 #include <fstream>
 #include <streambuf>
 #include <string>
diff --git a/tests/BackoffTest.cc b/tests/BackoffTest.cc
index d066b94..5fe4f71 100644
--- a/tests/BackoffTest.cc
+++ b/tests/BackoffTest.cc
@@ -26,42 +26,42 @@
 #include "lib/stats/ProducerStatsImpl.h"
 
 using namespace pulsar;
-using boost::posix_time::milliseconds;
-using boost::posix_time::seconds;
+using std::chrono::milliseconds;
+using std::chrono::seconds;
 
 static bool checkExactAndDecrementTimer(Backoff& backoff, const unsigned int& 
t2) {
-    const unsigned int& t1 = backoff.next().total_milliseconds();
-    boost::posix_time::ptime& firstBackOffTime = 
PulsarFriend::getFirstBackoffTime(backoff);
+    auto t1 = toMillis(backoff.next());
+    auto& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff);
     firstBackOffTime -= milliseconds(t2);
     return t1 == t2;
 }
 
 static bool withinTenPercentAndDecrementTimer(Backoff& backoff, const unsigned 
int& t2) {
-    const unsigned int& t1 = backoff.next().total_milliseconds();
-    boost::posix_time::ptime& firstBackOffTime = 
PulsarFriend::getFirstBackoffTime(backoff);
+    auto t1 = toMillis(backoff.next());
+    auto& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff);
     firstBackOffTime -= milliseconds(t2);
     return (t1 >= t2 * 0.9 && t1 <= t2);
 }
 
 TEST(BackoffTest, mandatoryStopTestNegativeTest) {
     Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900));
-    ASSERT_EQ(backoff.next().total_milliseconds(), 100);
-    backoff.next().total_milliseconds();  // 200
-    backoff.next().total_milliseconds();  // 400
-    backoff.next().total_milliseconds();  // 800
+    ASSERT_EQ(toMillis(backoff.next()), 100);
+    backoff.next();  // 200
+    backoff.next();  // 400
+    backoff.next();  // 800
     ASSERT_FALSE(withinTenPercentAndDecrementTimer(backoff, 400));
 }
 
 TEST(BackoffTest, firstBackoffTimerTest) {
     Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900));
-    ASSERT_EQ(backoff.next().total_milliseconds(), 100);
-    boost::posix_time::ptime firstBackOffTime = 
PulsarFriend::getFirstBackoffTime(backoff);
+    ASSERT_EQ(toMillis(backoff.next()), 100);
+    auto firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff);
     std::this_thread::sleep_for(std::chrono::milliseconds(300));
     TimeDuration diffBackOffTime = PulsarFriend::getFirstBackoffTime(backoff) 
- firstBackOffTime;
     ASSERT_EQ(diffBackOffTime, milliseconds(0));  // no change since reset not 
called
 
     backoff.reset();
-    ASSERT_EQ(backoff.next().total_milliseconds(), 100);
+    ASSERT_EQ(toMillis(backoff.next()), 100);
     diffBackOffTime = PulsarFriend::getFirstBackoffTime(backoff) - 
firstBackOffTime;
     ASSERT_TRUE(diffBackOffTime >= milliseconds(300) && diffBackOffTime < 
seconds(1));
 }
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index c7e98bc..f97457f 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -955,7 +955,7 @@ TEST(ConsumerTest, 
testGetLastMessageIdBlockWhenConnectionDisconnected) {
     auto elapsed = TimeUtils::now() - start;
 
     // getLastMessageIdAsync should be blocked until operationTimeout when the 
connection is disconnected.
-    ASSERT_GE(elapsed.seconds(), operationTimeout);
+    
ASSERT_GE(std::chrono::duration_cast<std::chrono::seconds>(elapsed).count(), 
operationTimeout);
 }
 
 TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index c2863e8..7377884 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -170,7 +170,7 @@ class PulsarFriend {
         handler.connection_ = conn;
     }
 
-    static boost::posix_time::ptime& getFirstBackoffTime(Backoff& backoff) {
+    static auto getFirstBackoffTime(Backoff& backoff) -> 
decltype(backoff.firstBackoffTime_)& {
         return backoff.firstBackoffTime_;
     }
 
diff --git a/tests/RoundRobinMessageRouterTest.cc 
b/tests/RoundRobinMessageRouterTest.cc
index 56a7605..145c45a 100644
--- a/tests/RoundRobinMessageRouterTest.cc
+++ b/tests/RoundRobinMessageRouterTest.cc
@@ -31,7 +31,7 @@ TEST(RoundRobinMessageRouterTest, onePartition) {
     const int numPartitions = 1;
 
     RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 
1,
-                                   boost::posix_time::milliseconds(0));
+                                   std::chrono::milliseconds(0));
 
     Message msg1 = 
MessageBuilder().setPartitionKey("my-key-1").setContent("one").build();
     Message msg2 = 
MessageBuilder().setPartitionKey("my-key-2").setContent("two").build();
@@ -49,7 +49,7 @@ TEST(RoundRobinMessageRouterTest, sameKey) {
     const int numPartitions = 13;
 
     RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 
1,
-                                   boost::posix_time::milliseconds(0));
+                                   std::chrono::milliseconds(0));
 
     Message msg1 = 
MessageBuilder().setPartitionKey("my-key").setContent("one").build();
     Message msg2 = 
MessageBuilder().setPartitionKey("my-key").setContent("two").build();
@@ -63,7 +63,7 @@ TEST(RoundRobinMessageRouterTest, batchingDisabled) {
     const int numPartitions = 13;
 
     RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 
1,
-                                   boost::posix_time::milliseconds(0));
+                                   std::chrono::milliseconds(0));
 
     Message msg1 = MessageBuilder().setContent("one").build();
     Message msg2 = MessageBuilder().setContent("two").build();
@@ -77,7 +77,7 @@ TEST(RoundRobinMessageRouterTest, batchingEnabled) {
     const int numPartitions = 13;
 
     RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 
1000, 100000,
-                                   boost::posix_time::seconds(1));
+                                   std::chrono::seconds(1));
 
     int p = -1;
     for (int i = 0; i < 100; i++) {
@@ -96,7 +96,7 @@ TEST(RoundRobinMessageRouterTest, maxDelay) {
     const int numPartitions = 13;
 
     RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 
1000, 100000,
-                                   boost::posix_time::seconds(1));
+                                   std::chrono::seconds(1));
 
     int p1 = -1;
     for (int i = 0; i < 100; i++) {
@@ -132,8 +132,7 @@ TEST(RoundRobinMessageRouterTest, maxDelay) {
 TEST(RoundRobinMessageRouterTest, maxNumberOfMessages) {
     const int numPartitions = 13;
 
-    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 2, 
1000,
-                                   boost::posix_time::seconds(1));
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 2, 
1000, std::chrono::seconds(1));
 
     Message msg1 = MessageBuilder().setContent("one").build();
     Message msg2 = MessageBuilder().setContent("two").build();
@@ -150,8 +149,7 @@ TEST(RoundRobinMessageRouterTest, maxNumberOfMessages) {
 TEST(RoundRobinMessageRouterTest, maxBatchSize) {
     const int numPartitions = 13;
 
-    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 10, 
8,
-                                   boost::posix_time::seconds(1));
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 10, 
8, std::chrono::seconds(1));
 
     Message msg1 = MessageBuilder().setContent("one").build();
     Message msg2 = MessageBuilder().setContent("two").build();
diff --git a/vcpkg.json b/vcpkg.json
index d023a40..5ff4410 100644
--- a/vcpkg.json
+++ b/vcpkg.json
@@ -5,45 +5,20 @@
   "builtin-baseline": "b051745c68faa6f65c493371d564c4eb8af34dad",
   "dependencies": [
     {
-      "name": "boost-accumulators",
-      "version>=": "1.83.0"
-    },
-    {
-      "name": "boost-algorithm",
-      "version>=": "1.83.0"
-    },
-    {
-      "name": "boost-any",
-      "version>=": "1.83.0"
-    },
-    {
-      "name": "boost-asio",
-      "version>=": "1.83.0"
-    },
-    {
-      "name": "boost-circular-buffer",
-      "version>=": "1.83.0"
-    },
-    {
-      "name": "boost-date-time",
-      "version>=": "1.83.0"
+      "name": "asio",
+      "features": [
+        "openssl"
+      ],
+      "version>=": "1.28.2"
     },
     {
-      "name": "boost-predef",
+      "name": "boost-accumulators",
       "version>=": "1.83.0"
     },
     {
       "name": "boost-property-tree",
       "version>=": "1.83.0"
     },
-    {
-      "name": "boost-serialization",
-      "version>=": "1.83.0"
-    },
-    {
-      "name": "boost-xpressive",
-      "version>=": "1.83.0"
-    },
     {
       "name": "curl",
       "default-features": false,

Reply via email to