This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7d401a589ab5d4142cac53510daa27930a52b5fb
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Jun 26 02:36:50 2021 +0800

    [C++][Python] Add connection timeout configuration (#11029)
    
    Fixes #10747
    
    ### Motivation
    
    This PR is a catchup of https://github.com/apache/pulsar/pull/2852 and adds 
connection timeout configuration to C++ and Python client.
    
    ### Modifications
    
    - Add a `PeriodicTask` class to execute tasks periodically and the relate 
unit tests: `PeriodicTastTest`.
    - Use `PeriodicTask` to register a timer before connecting to broker 
asynchronously, if the connection was not established when the timer is 
triggered, close the socket so that `handleTcpConnected` can be triggered 
immediately with a failure.
    - Add connection timeout (in milliseconds) to both C++ and Python clients.
    - Add `ClientTest.testConnectTimeout` (C++) and `test_connect_timeout` 
(Python) and  to verify the connection timeout works.
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    This change added tests and can be verified as follows:
    - PeriodicTaskTest
    - ClientTest.testConnectTimeout
    - test_connect_timeout
    
    (cherry picked from commit 6062d2fd70936a691b5a2369580eaab01fe033f3)
---
 .../include/pulsar/ClientConfiguration.h           | 16 +++++
 pulsar-client-cpp/lib/ClientConfiguration.cc       |  8 +++
 pulsar-client-cpp/lib/ClientConfigurationImpl.h    |  1 +
 pulsar-client-cpp/lib/ClientConnection.cc          | 28 +++++++-
 pulsar-client-cpp/lib/ClientConnection.h           |  2 +
 pulsar-client-cpp/lib/ClientImpl.cc                |  3 +-
 pulsar-client-cpp/lib/ExecutorService.h            |  2 +
 pulsar-client-cpp/lib/PeriodicTask.cc              | 60 +++++++++++++++++
 pulsar-client-cpp/lib/PeriodicTask.h               | 76 ++++++++++++++++++++++
 pulsar-client-cpp/python/pulsar/__init__.py        |  7 +-
 pulsar-client-cpp/python/pulsar_test.py            | 18 ++++-
 pulsar-client-cpp/python/src/config.cc             |  2 +
 pulsar-client-cpp/tests/ClientTest.cc              | 28 ++++++++
 pulsar-client-cpp/tests/PeriodicTaskTest.cc        | 75 +++++++++++++++++++++
 14 files changed, 322 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
index 11bfc43..2f2c461 100644
--- a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
@@ -244,6 +244,22 @@ class PULSAR_PUBLIC ClientConfiguration {
      */
     unsigned int getPartitionsUpdateInterval() const;
 
+    /**
+     * Set the duration of time to wait for a connection to a broker to be 
established. If the duration passes
+     * without a response from the broker, the connection attempt is dropped.
+     *
+     * Default: 10000
+     *
+     * @param timeoutMs the duration in milliseconds
+     * @return
+     */
+    ClientConfiguration& setConnectionTimeout(int timeoutMs);
+
+    /**
+     * The getter associated with setConnectionTimeout().
+     */
+    int getConnectionTimeout() const;
+
     friend class ClientImpl;
     friend class PulsarWrapper;
 
diff --git a/pulsar-client-cpp/lib/ClientConfiguration.cc 
b/pulsar-client-cpp/lib/ClientConfiguration.cc
index 52072a3..188980a 100644
--- a/pulsar-client-cpp/lib/ClientConfiguration.cc
+++ b/pulsar-client-cpp/lib/ClientConfiguration.cc
@@ -140,4 +140,12 @@ ClientConfiguration& 
ClientConfiguration::setListenerName(const std::string& lis
 }
 
 const std::string& ClientConfiguration::getListenerName() const { return 
impl_->listenerName; }
+
+ClientConfiguration& ClientConfiguration::setConnectionTimeout(int timeoutMs) {
+    impl_->connectionTimeoutMs = timeoutMs;
+    return *this;
+}
+
+int ClientConfiguration::getConnectionTimeout() const { return 
impl_->connectionTimeoutMs; }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ClientConfigurationImpl.h 
b/pulsar-client-cpp/lib/ClientConfigurationImpl.h
index 631e8ae..887ecf2 100644
--- a/pulsar-client-cpp/lib/ClientConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ClientConfigurationImpl.h
@@ -39,6 +39,7 @@ struct ClientConfigurationImpl {
     bool validateHostName{false};
     unsigned int partitionsUpdateInterval{60};  // 1 minute
     std::string listenerName;
+    int connectionTimeoutMs{10000};  // 10 seconds
 
     std::unique_ptr<LoggerFactory> takeLogger() { return 
std::move(loggerFactory); }
 };
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index 58a3cce..3b07115 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -175,6 +175,8 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
       error_(boost::system::error_code()),
       incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
       incomingCmd_(),
+      
connectTimeoutTask_(std::make_shared<PeriodicTask>(executor_->getIOService(),
+                                                         
clientConfiguration.getConnectionTimeout())),
       pendingWriteBuffers_(),
       pendingWriteOperations_(0),
       outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
@@ -374,6 +376,7 @@ void ClientConnection::handleTcpConnected(const 
boost::system::error_code& err,
             LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical 
broker: " << logicalAddress_);
         }
         state_ = TcpConnected;
+        connectTimeoutTask_->stop();
         socket_->set_option(tcp::no_delay(true));
 
         socket_->set_option(tcp::socket::keep_alive(true));
@@ -414,7 +417,13 @@ void ClientConnection::handleTcpConnected(const 
boost::system::error_code& err,
         }
     } else if (endpointIterator != tcp::resolver::iterator()) {
         // The connection failed. Try the next endpoint in the list.
-        socket_->close();
+        boost::system::error_code err;
+        socket_->close(err);  // ignore the error of close
+        if (err) {
+            LOG_WARN(cnxString_ << "Failed to close socket: " << 
err.message());
+        }
+        connectTimeoutTask_->stop();
+        connectTimeoutTask_->start();
         tcp::endpoint endpoint = *endpointIterator;
         socket_->async_connect(endpoint, 
std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
                                                    std::placeholders::_1, 
++endpointIterator));
@@ -500,6 +509,18 @@ void ClientConnection::handleResolve(const 
boost::system::error_code& err,
         return;
     }
 
+    auto self = shared_from_this();
+    connectTimeoutTask_->setCallback([this, self](const 
PeriodicTask::ErrorCode& ec) {
+        if (state_ != TcpConnected) {
+            LOG_ERROR(cnxString_ << "Connection was not established in " << 
connectTimeoutTask_->getPeriodMs()
+                                 << " ms, close the socket");
+            PeriodicTask::ErrorCode ignoredError;
+            socket_->close(ignoredError);
+        }
+        connectTimeoutTask_->stop();
+    });
+
+    connectTimeoutTask_->start();
     if (endpointIterator != tcp::resolver::iterator()) {
         LOG_DEBUG(cnxString_ << "Resolved hostname " << 
endpointIterator->host_name()  //
                              << " to " << endpointIterator->endpoint());
@@ -1412,6 +1433,9 @@ void ClientConnection::close() {
     state_ = Disconnected;
     boost::system::error_code err;
     socket_->close(err);
+    if (err) {
+        LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
+    }
 
     if (tlsSocket_) {
         tlsSocket_->lowest_layer().close();
@@ -1442,6 +1466,8 @@ void ClientConnection::close() {
         consumerStatsRequestTimer_.reset();
     }
 
+    connectTimeoutTask_->stop();
+
     lock.unlock();
     LOG_INFO(cnxString_ << "Connection closed");
 
diff --git a/pulsar-client-cpp/lib/ClientConnection.h 
b/pulsar-client-cpp/lib/ClientConnection.h
index 71db1ad..a20219c 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -45,6 +45,7 @@
 #include <pulsar/Client.h>
 #include <set>
 #include <lib/BrokerConsumerStatsImpl.h>
+#include "lib/PeriodicTask.h"
 
 using namespace pulsar;
 
@@ -283,6 +284,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     proto::BaseCommand incomingCmd_;
 
     Promise<Result, ClientConnectionWeakPtr> connectPromise_;
+    std::shared_ptr<PeriodicTask> connectTimeoutTask_;
 
     typedef std::map<long, PendingRequestData> PendingRequestsMap;
     PendingRequestsMap pendingRequests_;
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc 
b/pulsar-client-cpp/lib/ClientImpl.cc
index d9a2b57..b93ad2d 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -480,8 +480,9 @@ void ClientImpl::closeAsync(CloseCallback callback) {
     state_ = Closing;
     lock.unlock();
 
-    LOG_INFO("Closing Pulsar client");
     SharedInt numberOfOpenHandlers = std::make_shared<int>(producers.size() + 
consumers.size());
+    LOG_INFO("Closing Pulsar client with " << producers.size() << " producers 
and " << consumers.size()
+                                           << " consumers");
 
     for (ProducersList::iterator it = producers.begin(); it != 
producers.end(); ++it) {
         ProducerImplBasePtr producer = it->lock();
diff --git a/pulsar-client-cpp/lib/ExecutorService.h 
b/pulsar-client-cpp/lib/ExecutorService.h
index b673b79..d0ffc23 100644
--- a/pulsar-client-cpp/lib/ExecutorService.h
+++ b/pulsar-client-cpp/lib/ExecutorService.h
@@ -47,6 +47,8 @@ class PULSAR_PUBLIC ExecutorService : private 
boost::noncopyable {
     void postWork(std::function<void(void)> task);
     void close();
 
+    boost::asio::io_service &getIOService() { return *io_service_; }
+
    private:
     /*
      *  only called once and within lock so no need to worry about 
thread-safety
diff --git a/pulsar-client-cpp/lib/PeriodicTask.cc 
b/pulsar-client-cpp/lib/PeriodicTask.cc
new file mode 100644
index 0000000..f25a175
--- /dev/null
+++ b/pulsar-client-cpp/lib/PeriodicTask.cc
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "lib/PeriodicTask.h"
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+namespace pulsar {
+
+void PeriodicTask::start() {
+    if (state_ != Pending) {
+        return;
+    }
+    state_ = Ready;
+    if (periodMs_ >= 0) {
+        auto self = shared_from_this();
+        timer_.expires_from_now(boost::posix_time::millisec(periodMs_));
+        timer_.async_wait([this, self](const ErrorCode& ec) { 
handleTimeout(ec); });
+    }
+}
+
+void PeriodicTask::stop() {
+    State state = Ready;
+    if (!state_.compare_exchange_strong(state, Closing)) {
+        return;
+    }
+    timer_.cancel();
+    state_ = Pending;
+}
+
+void PeriodicTask::handleTimeout(const ErrorCode& ec) {
+    if (state_ != Ready) {
+        return;
+    }
+
+    callback_(ec);
+
+    // state_ may be changed in handleTimeout, so we check state_ again
+    if (state_ == Ready) {
+        auto self = shared_from_this();
+        timer_.expires_from_now(boost::posix_time::millisec(periodMs_));
+        timer_.async_wait([this, self](const ErrorCode& ec) { 
handleTimeout(ec); });
+    }
+}
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PeriodicTask.h 
b/pulsar-client-cpp/lib/PeriodicTask.h
new file mode 100644
index 0000000..57d0734
--- /dev/null
+++ b/pulsar-client-cpp/lib/PeriodicTask.h
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <atomic>
+#include <cstdint>
+#include <functional>
+#include <memory>
+
+#include <boost/asio.hpp>
+
+namespace pulsar {
+
+/**
+ * A task that is executed periodically.
+ *
+ * After the `start()` method is called, it will trigger `callback_` method 
periodically whose interval is
+ * `periodMs` in the constructor. After the `stop()` method is called, the 
timer will be cancelled and
+ * `callback()` will never be called again unless `start()` was called again.
+ *
+ * If you don't want to execute the task infinitely, you can call `stop()` in 
the implementation of
+ * `callback()` method.
+ *
+ * NOTE: If the `periodMs` is negative, the `callback()` will never be called.
+ */
+class PeriodicTask : public std::enable_shared_from_this<PeriodicTask> {
+   public:
+    using ErrorCode = boost::system::error_code;
+    using CallbackType = std::function<void(const ErrorCode&)>;
+
+    enum State : std::uint8_t
+    {
+        Pending,
+        Ready,
+        Closing
+    };
+
+    PeriodicTask(boost::asio::io_service& ioService, int periodMs) : 
timer_(ioService), periodMs_(periodMs) {}
+
+    void start();
+
+    void stop();
+
+    void setCallback(CallbackType callback) noexcept { callback_ = callback; }
+
+    State getState() const noexcept { return state_; }
+    int getPeriodMs() const noexcept { return periodMs_; }
+
+   private:
+    std::atomic<State> state_{Pending};
+    boost::asio::deadline_timer timer_;
+    const int periodMs_;
+    CallbackType callback_{trivialCallback};
+
+    void handleTimeout(const ErrorCode& ec);
+
+    static void trivialCallback(const ErrorCode&) {}
+};
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index 93df3b5..514ca11 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -363,7 +363,8 @@ class Client:
                  tls_trust_certs_file_path=None,
                  tls_allow_insecure_connection=False,
                  tls_validate_hostname=False,
-                 logger=None
+                 logger=None,
+                 connection_timeout_ms=10000,
                  ):
         """
         Create a new Pulsar client instance.
@@ -409,10 +410,13 @@ class Client:
           the endpoint.
         * `logger`:
           Set a Python logger for this Pulsar client. Should be an instance of 
`logging.Logger`.
+        * `connection_timeout_ms`:
+          Set timeout in milliseconds on TCP connections.
         """
         _check_type(str, service_url, 'service_url')
         _check_type_or_none(Authentication, authentication, 'authentication')
         _check_type(int, operation_timeout_seconds, 
'operation_timeout_seconds')
+        _check_type(int, connection_timeout_ms, 'connection_timeout_ms')
         _check_type(int, io_threads, 'io_threads')
         _check_type(int, message_listener_threads, 'message_listener_threads')
         _check_type(int, concurrent_lookup_requests, 
'concurrent_lookup_requests')
@@ -427,6 +431,7 @@ class Client:
         if authentication:
             conf.authentication(authentication.auth)
         conf.operation_timeout_seconds(operation_timeout_seconds)
+        conf.connection_timeout(connection_timeout_ms)
         conf.io_threads(io_threads)
         conf.message_listener_threads(message_listener_threads)
         conf.concurrent_lookup_requests(concurrent_lookup_requests)
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index dd2b228..fc17c72 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -30,7 +30,7 @@ from pulsar import Client, MessageId, \
             AuthenticationTLS, Authentication, AuthenticationToken, 
InitialPosition, \
             CryptoKeyReader
 
-from _pulsar import ProducerConfiguration, ConsumerConfiguration
+from _pulsar import ProducerConfiguration, ConsumerConfiguration, ConnectError
 
 from schema_test import *
 
@@ -1148,6 +1148,22 @@ class PulsarTest(TestCase):
             consumer.receive(100)
         client.close()
 
+    def test_connect_timeout(self):
+        client = pulsar.Client(
+            service_url='pulsar://192.0.2.1:1234',
+            connection_timeout_ms=1000, # 1 second
+        )
+        t1 = time.time()
+        try:
+            producer = client.create_producer('test_connect_timeout')
+            self.fail('create_producer should not succeed')
+        except ConnectError as expected:
+            print('expected error: {} when create producer'.format(expected))
+        t2 = time.time()
+        self.assertGreater(t2 - t1, 1.0)
+        self.assertLess(t2 - t1, 1.5) # 1.5 seconds is long enough
+        client.close()
+
     def _check_value_error(self, fun):
         with self.assertRaises(ValueError):
             fun()
diff --git a/pulsar-client-cpp/python/src/config.cc 
b/pulsar-client-cpp/python/src/config.cc
index 5838208..b665ec7 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -195,6 +195,8 @@ void export_config() {
             .def("authentication", &ClientConfiguration_setAuthentication, 
return_self<>())
             .def("operation_timeout_seconds", 
&ClientConfiguration::getOperationTimeoutSeconds)
             .def("operation_timeout_seconds", 
&ClientConfiguration::setOperationTimeoutSeconds, return_self<>())
+            .def("connection_timeout", 
&ClientConfiguration::getConnectionTimeout)
+            .def("connection_timeout", 
&ClientConfiguration::setConnectionTimeout, return_self<>())
             .def("io_threads", &ClientConfiguration::getIOThreads)
             .def("io_threads", &ClientConfiguration::setIOThreads, 
return_self<>())
             .def("message_listener_threads", 
&ClientConfiguration::getMessageListenerThreads)
diff --git a/pulsar-client-cpp/tests/ClientTest.cc 
b/pulsar-client-cpp/tests/ClientTest.cc
index 91232dc..129a322 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -18,6 +18,7 @@
  */
 #include <gtest/gtest.h>
 
+#include <future>
 #include <pulsar/Client.h>
 #include "../lib/checksum/ChecksumProvider.h"
 
@@ -86,3 +87,30 @@ TEST(ClientTest, testServerConnectError) {
     ASSERT_EQ(ResultConnectError, client.createReader(topic, 
MessageId::earliest(), readerConf, reader));
     client.close();
 }
+
+TEST(ClientTest, testConnectTimeout) {
+    // 192.0.2.0/24 is assigned for documentation, should be a deadend
+    const std::string blackHoleBroker = "pulsar://192.0.2.1:1234";
+    const std::string topic = "test-connect-timeout";
+
+    Client clientLow(blackHoleBroker, 
ClientConfiguration().setConnectionTimeout(1000));
+    Client clientDefault(blackHoleBroker);
+
+    std::promise<Result> promiseLow;
+    clientLow.createProducerAsync(
+        topic, [&promiseLow](Result result, Producer producer) { 
promiseLow.set_value(result); });
+
+    std::promise<Result> promiseDefault;
+    clientDefault.createProducerAsync(
+        topic, [&promiseDefault](Result result, Producer producer) { 
promiseDefault.set_value(result); });
+
+    auto futureLow = promiseLow.get_future();
+    ASSERT_EQ(futureLow.wait_for(std::chrono::milliseconds(1500)), 
std::future_status::ready);
+    ASSERT_EQ(futureLow.get(), ResultConnectError);
+
+    auto futureDefault = promiseDefault.get_future();
+    ASSERT_EQ(futureDefault.wait_for(std::chrono::milliseconds(10)), 
std::future_status::timeout);
+
+    clientLow.close();
+    clientDefault.close();
+}
diff --git a/pulsar-client-cpp/tests/PeriodicTaskTest.cc 
b/pulsar-client-cpp/tests/PeriodicTaskTest.cc
new file mode 100644
index 0000000..11c1c62
--- /dev/null
+++ b/pulsar-client-cpp/tests/PeriodicTaskTest.cc
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <atomic>
+#include <chrono>
+#include <thread>
+#include "lib/ExecutorService.h"
+#include "lib/LogUtils.h"
+#include "lib/PeriodicTask.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+TEST(PeriodicTaskTest, testCountdownTask) {
+    ExecutorService executor;
+
+    std::atomic_int count{5};
+
+    auto task = std::make_shared<PeriodicTask>(executor.getIOService(), 200);
+    task->setCallback([task, &count](const PeriodicTask::ErrorCode& ec) {
+        if (--count <= 0) {
+            task->stop();
+        }
+        LOG_INFO("Now count is " << count << ", error code: " << ec.message());
+    });
+
+    // Wait for 2 seconds to verify callback won't be triggered after 1 second 
(200 ms * 5)
+    task->start();
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+    LOG_INFO("Now count is " << count);
+    ASSERT_EQ(count.load(), 0);
+    task->stop();  // it's redundant, just to verify multiple stop() is 
idempotent
+
+    // Test start again
+    count = 1;
+    task->start();
+    std::this_thread::sleep_for(std::chrono::milliseconds(800));
+    LOG_INFO("Now count is " << count);
+    ASSERT_EQ(count.load(), 0);
+    task->stop();
+
+    executor.close();
+}
+
+TEST(PeriodicTaskTest, testNegativePeriod) {
+    ExecutorService executor;
+
+    auto task = std::make_shared<PeriodicTask>(executor.getIOService(), -1);
+    std::atomic_bool callbackTriggered{false};
+    task->setCallback([&callbackTriggered](const PeriodicTask::ErrorCode& ec) 
{ callbackTriggered = true; });
+
+    task->start();
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    ASSERT_EQ(callbackTriggered.load(), false);
+    task->stop();
+
+    executor.close();
+}

Reply via email to