This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 72b7311  Fix the operation timeout is not honored for GetSchema 
requests (#383)
72b7311 is described below

commit 72b7311aeef32e28a28e926da686aaf948e8f948
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Jan 19 21:31:19 2024 +0800

    Fix the operation timeout is not honored for GetSchema requests (#383)
---
 lib/ClientConfiguration.cc           |  7 +++++--
 lib/ClientConfigurationImpl.h        |  4 +++-
 lib/ClientConnection.cc              | 31 +++++++++++++++++++++++++---
 lib/ClientConnection.h               |  8 +++++++-
 lib/ClientImpl.cc                    |  7 ++++++-
 lib/ClientImpl.h                     |  2 ++
 lib/Future.h                         |  2 +-
 lib/RetryableLookupService.h         | 12 ++++++-----
 lib/RetryableOperation.h             |  7 ++++---
 lib/RetryableOperationCache.h        |  9 ++++----
 tests/LookupServiceTest.cc           | 40 ++++++++++++++++++++++++++++++++----
 tests/PulsarWrapper.h                | 31 ++++++++++++++++++++++++++++
 tests/RetryableOperationCacheTest.cc |  9 ++++----
 13 files changed, 140 insertions(+), 29 deletions(-)

diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc
index 6e7c745..6a91ec1 100644
--- a/lib/ClientConfiguration.cc
+++ b/lib/ClientConfiguration.cc
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <chrono>
 #include <stdexcept>
 
 #include "ClientConfigurationImpl.h"
@@ -61,11 +62,13 @@ Authentication& ClientConfiguration::getAuth() const { 
return *impl_->authentica
 const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return 
impl_->authenticationPtr; }
 
 ClientConfiguration& ClientConfiguration::setOperationTimeoutSeconds(int 
timeout) {
-    impl_->operationTimeoutSeconds = timeout;
+    impl_->operationTimeout = std::chrono::seconds(timeout);
     return *this;
 }
 
-int ClientConfiguration::getOperationTimeoutSeconds() const { return 
impl_->operationTimeoutSeconds; }
+int ClientConfiguration::getOperationTimeoutSeconds() const {
+    return 
std::chrono::duration_cast<std::chrono::seconds>(impl_->operationTimeout).count();
+}
 
 ClientConfiguration& ClientConfiguration::setIOThreads(int threads) {
     impl_->ioThreads = threads;
diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h
index b62b97c..08d4b6e 100644
--- a/lib/ClientConfigurationImpl.h
+++ b/lib/ClientConfigurationImpl.h
@@ -21,6 +21,8 @@
 
 #include <pulsar/ClientConfiguration.h>
 
+#include <chrono>
+
 namespace pulsar {
 
 struct ClientConfigurationImpl {
@@ -28,7 +30,7 @@ struct ClientConfigurationImpl {
     uint64_t memoryLimit{0ull};
     int ioThreads{1};
     int connectionsPerBroker{1};
-    int operationTimeoutSeconds{30};
+    std::chrono::nanoseconds operationTimeout{30L * 1000 * 1000 * 1000};
     int messageListenerThreads{1};
     int concurrentLookupRequest{50000};
     int maxLookupRedirects{20};
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 82ab492..b2916bd 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -24,6 +24,7 @@
 #include <fstream>
 
 #include "AsioDefines.h"
+#include "ClientImpl.h"
 #include "Commands.h"
 #include "ConnectionPool.h"
 #include "ConsumerImpl.h"
@@ -163,7 +164,7 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
                                    const ClientConfiguration& 
clientConfiguration,
                                    const AuthenticationPtr& authentication, 
const std::string& clientVersion,
                                    ConnectionPool& pool, size_t poolIndex)
-    : 
operationsTimeout_(std::chrono::seconds(clientConfiguration.getOperationTimeoutSeconds())),
+    : operationsTimeout_(ClientImpl::getOperationTimeout(clientConfiguration)),
       authentication_(authentication),
       serverProtocolVersion_(proto::ProtocolVersion_MIN),
       executor_(executor),
@@ -1278,6 +1279,7 @@ void ClientConnection::close(Result result, bool detach) {
     auto pendingConsumerStatsMap = std::move(pendingConsumerStatsMap_);
     auto pendingGetLastMessageIdRequests = 
std::move(pendingGetLastMessageIdRequests_);
     auto pendingGetNamespaceTopicsRequests = 
std::move(pendingGetNamespaceTopicsRequests_);
+    auto pendingGetSchemaRequests = std::move(pendingGetSchemaRequests_);
 
     numOfPendingLookupRequest_ = 0;
 
@@ -1342,6 +1344,9 @@ void ClientConnection::close(Result result, bool detach) {
     for (auto& kv : pendingGetNamespaceTopicsRequests) {
         kv.second.setFailed(result);
     }
+    for (auto& kv : pendingGetSchemaRequests) {
+        kv.second.promise.setFailed(result);
+    }
 }
 
 bool ClientConnection::isClosed() const { return state_ == Disconnected; }
@@ -1430,6 +1435,7 @@ Future<Result, NamespaceTopicsPtr> 
ClientConnection::newGetTopicsOfNamespace(
 Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& 
topicName,
                                                           const std::string& 
version, uint64_t requestId) {
     Lock lock(mutex_);
+
     Promise<Result, SchemaInfo> promise;
     if (isClosed()) {
         lock.unlock();
@@ -1438,8 +1444,27 @@ Future<Result, SchemaInfo> 
ClientConnection::newGetSchema(const std::string& top
         return promise.getFuture();
     }
 
-    pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise));
+    auto timer = executor_->createDeadlineTimer();
+    pendingGetSchemaRequests_.emplace(requestId, GetSchemaRequest{promise, 
timer});
     lock.unlock();
+
+    auto weakSelf = weak_from_this();
+    timer->expires_from_now(operationsTimeout_);
+    timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return;
+        }
+        Lock lock(mutex_);
+        auto it = pendingGetSchemaRequests_.find(requestId);
+        if (it != pendingGetSchemaRequests_.end()) {
+            auto promise = std::move(it->second.promise);
+            pendingGetSchemaRequests_.erase(it);
+            lock.unlock();
+            promise.setFailed(ResultTimeout);
+        }
+    });
+
     sendCommand(Commands::newGetSchema(topicName, version, requestId));
     return promise.getFuture();
 }
@@ -1867,7 +1892,7 @@ void ClientConnection::handleGetSchemaResponse(const 
proto::CommandGetSchemaResp
     Lock lock(mutex_);
     auto it = pendingGetSchemaRequests_.find(response.request_id());
     if (it != pendingGetSchemaRequests_.end()) {
-        Promise<Result, SchemaInfo> getSchemaPromise = it->second;
+        Promise<Result, SchemaInfo> getSchemaPromise = it->second.promise;
         pendingGetSchemaRequests_.erase(it);
         lock.unlock();
 
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 69155fd..1d44f05 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -42,6 +42,7 @@
 #include <functional>
 #include <memory>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include "AsioTimer.h"
@@ -224,6 +225,11 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
         DeadlineTimerPtr timer;
     };
 
+    struct GetSchemaRequest {
+        Promise<Result, SchemaInfo> promise;
+        DeadlineTimerPtr timer;
+    };
+
     /*
      * handler for connectAsync
      * creates a ConnectionPtr which has a valid ClientConnection object
@@ -363,7 +369,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> 
PendingGetNamespaceTopicsMap;
     PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
 
-    typedef std::map<long, Promise<Result, SchemaInfo>> PendingGetSchemaMap;
+    typedef std::unordered_map<uint64_t, GetSchemaRequest> PendingGetSchemaMap;
     PendingGetSchemaMap pendingGetSchemaRequests_;
 
     mutable std::mutex mutex_;
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 2cfbd38..76d4389 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -21,6 +21,7 @@
 #include <pulsar/ClientConfiguration.h>
 #include <pulsar/Version.h>
 
+#include <chrono>
 #include <random>
 #include <sstream>
 
@@ -109,7 +110,7 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const 
ClientConfiguration&
     }
 
     lookupServicePtr_ = RetryableLookupService::create(
-        underlyingLookupServicePtr, 
clientConfiguration_.getOperationTimeoutSeconds(), ioExecutorProvider_);
+        underlyingLookupServicePtr, 
clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
 }
 
 ClientImpl::~ClientImpl() { shutdown(); }
@@ -768,4 +769,8 @@ std::string ClientImpl::getClientVersion(const 
ClientConfiguration& clientConfig
     return oss.str();
 }
 
+std::chrono::nanoseconds ClientImpl::getOperationTimeout(const 
ClientConfiguration& clientConfiguration) {
+    return clientConfiguration.impl_->operationTimeout;
+}
+
 } /* namespace pulsar */
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index f7b5a89..762aa60 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -125,6 +125,8 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
 
     ConnectionPool& getConnectionPool() noexcept { return pool_; }
 
+    static std::chrono::nanoseconds getOperationTimeout(const 
ClientConfiguration& clientConfiguration);
+
     friend class PulsarFriend;
 
    private:
diff --git a/lib/Future.h b/lib/Future.h
index 290ebc6..69db74a 100644
--- a/lib/Future.h
+++ b/lib/Future.h
@@ -141,7 +141,7 @@ class Promise {
     Future<Result, Type> getFuture() const { return Future<Result, 
Type>{state_}; }
 
    private:
-    const InternalStatePtr<Result, Type> state_;
+    InternalStatePtr<Result, Type> state_;
 };
 
 }  // namespace pulsar
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index b8e3e0d..561855f 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -18,6 +18,8 @@
  */
 #pragma once
 
+#include <chrono>
+
 #include "LookupDataResult.h"
 #include "LookupService.h"
 #include "NamespaceName.h"
@@ -81,15 +83,15 @@ class RetryableLookupService : public LookupService {
     RetryableOperationCachePtr<NamespaceTopicsPtr> namespaceLookupCache_;
     RetryableOperationCachePtr<SchemaInfo> getSchemaCache_;
 
-    RetryableLookupService(std::shared_ptr<LookupService> lookupService, int 
timeoutSeconds,
+    RetryableLookupService(std::shared_ptr<LookupService> lookupService, 
TimeDuration timeout,
                            ExecutorServiceProviderPtr executorProvider)
         : lookupService_(lookupService),
-          
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider, 
timeoutSeconds)),
+          
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider, 
timeout)),
           partitionLookupCache_(
-              
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider, 
timeoutSeconds)),
+              
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider, 
timeout)),
           namespaceLookupCache_(
-              
RetryableOperationCache<NamespaceTopicsPtr>::create(executorProvider, 
timeoutSeconds)),
-          
getSchemaCache_(RetryableOperationCache<SchemaInfo>::create(executorProvider, 
timeoutSeconds)) {}
+              
RetryableOperationCache<NamespaceTopicsPtr>::create(executorProvider, timeout)),
+          
getSchemaCache_(RetryableOperationCache<SchemaInfo>::create(executorProvider, 
timeout)) {}
 };
 
 }  // namespace pulsar
diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h
index 9c920da..dba190f 100644
--- a/lib/RetryableOperation.h
+++ b/lib/RetryableOperation.h
@@ -22,6 +22,7 @@
 
 #include <algorithm>
 #include <atomic>
+#include <chrono>
 #include <functional>
 #include <memory>
 
@@ -40,11 +41,11 @@ class RetryableOperation : public 
std::enable_shared_from_this<RetryableOperatio
         explicit PassKey() {}
     };
 
-    RetryableOperation(const std::string& name, std::function<Future<Result, 
T>()>&& func, int timeoutSeconds,
-                       DeadlineTimerPtr timer)
+    RetryableOperation(const std::string& name, std::function<Future<Result, 
T>()>&& func,
+                       TimeDuration timeout, DeadlineTimerPtr timer)
         : name_(name),
           func_(std::move(func)),
-          timeout_(std::chrono::seconds(timeoutSeconds)),
+          timeout_(timeout),
           backoff_(std::chrono::milliseconds(100), timeout_ + timeout_, 
std::chrono::milliseconds(0)),
           timer_(timer) {}
 
diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h
index 70fa914..e42460d 100644
--- a/lib/RetryableOperationCache.h
+++ b/lib/RetryableOperationCache.h
@@ -18,6 +18,7 @@
  */
 #pragma once
 
+#include <chrono>
 #include <mutex>
 #include <unordered_map>
 
@@ -40,8 +41,8 @@ class RetryableOperationCache : public 
std::enable_shared_from_this<RetryableOpe
         explicit PassKey() {}
     };
 
-    RetryableOperationCache(ExecutorServiceProviderPtr executorProvider, int 
timeoutSeconds)
-        : executorProvider_(executorProvider), timeoutSeconds_(timeoutSeconds) 
{}
+    RetryableOperationCache(ExecutorServiceProviderPtr executorProvider, 
TimeDuration timeout)
+        : executorProvider_(executorProvider), timeout_(timeout) {}
 
     using Self = RetryableOperationCache<T>;
 
@@ -69,7 +70,7 @@ class RetryableOperationCache : public 
std::enable_shared_from_this<RetryableOpe
                 return promise.getFuture();
             }
 
-            auto operation = RetryableOperation<T>::create(key, 
std::move(func), timeoutSeconds_, timer);
+            auto operation = RetryableOperation<T>::create(key, 
std::move(func), timeout_, timer);
             auto future = operation->run();
             operations_[key] = operation;
             lock.unlock();
@@ -106,7 +107,7 @@ class RetryableOperationCache : public 
std::enable_shared_from_this<RetryableOpe
 
    private:
     ExecutorServiceProviderPtr executorProvider_;
-    const int timeoutSeconds_;
+    const TimeDuration timeout_;
 
     std::unordered_map<std::string, std::shared_ptr<RetryableOperation<T>>> 
operations_;
     mutable std::mutex mutex_;
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index 3457bd4..0fe1385 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -22,11 +22,14 @@
 
 #include <algorithm>
 #include <boost/exception/all.hpp>
+#include <chrono>
 #include <future>
+#include <memory>
 #include <stdexcept>
 
 #include "HttpHelper.h"
 #include "PulsarFriend.h"
+#include "PulsarWrapper.h"
 #include "lib/BinaryProtoLookupService.h"
 #include "lib/ClientConnection.h"
 #include "lib/ConnectionPool.h"
@@ -48,7 +51,10 @@ namespace pulsar {
 
 class LookupServiceTest : public ::testing::TestWithParam<std::string> {
    public:
-    void SetUp() override { client_ = Client{GetParam()}; }
+    void SetUp() override {
+        serviceUrl_ = GetParam();
+        client_ = Client{serviceUrl_};
+    }
     void TearDown() override { client_.close(); }
 
     template <typename T>
@@ -63,6 +69,7 @@ class LookupServiceTest : public 
::testing::TestWithParam<std::string> {
     }
 
    protected:
+    std::string serviceUrl_;
     Client client_{httpLookupUrl};
 };
 
@@ -159,7 +166,7 @@ TEST(LookupServiceTest, testRetry) {
     ClientConfiguration conf;
 
     auto lookupService = RetryableLookupService::create(
-        std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, 
conf), 30 /* seconds */,
+        std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, 
conf), std::chrono::seconds(30),
         executorProvider);
 
     PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
@@ -194,8 +201,8 @@ TEST(LookupServiceTest, testTimeout) {
 
     constexpr int timeoutInSeconds = 2;
     auto lookupService = RetryableLookupService::create(
-        std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, 
conf), timeoutInSeconds,
-        executorProvider);
+        std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, 
conf),
+        std::chrono::seconds(timeoutInSeconds), executorProvider);
     auto topicNamePtr = TopicName::get("lookup-service-test-retry");
 
     decltype(std::chrono::high_resolution_clock::now()) startTime;
@@ -431,6 +438,31 @@ TEST_P(LookupServiceTest, testGetSchemaByVersion) {
     producer2.close();
 }
 
+TEST_P(LookupServiceTest, testGetSchemaTimeout) {
+    if (serviceUrl_.find("pulsar://") == std::string::npos) {
+        // HTTP request timeout can only be configured with seconds
+        return;
+    }
+    const auto topic = "lookup-service-test-get-schema-timeout";
+    Producer producer;
+    ProducerConfiguration producerConf;
+    producerConf.setSchema(SchemaInfo(STRING, "", ""));
+    ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConf, producer));
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent("msg").build()));
+    client_.close();
+
+    ClientConfiguration conf;
+    PulsarWrapper::deref(conf).operationTimeout = std::chrono::nanoseconds(1);
+    client_ = Client{serviceUrl_, conf};
+    auto promise = std::make_shared<std::promise<Result>>();
+    client_.getSchemaInfoAsync(topic, 0L,
+                               [promise](Result result, const SchemaInfo&) { 
promise->set_value(result); });
+    auto future = promise->get_future();
+    ASSERT_EQ(future.wait_for(std::chrono::milliseconds(100)), 
std::future_status::ready);
+    ASSERT_EQ(future.get(), ResultTimeout);
+    client_.close();
+}
+
 INSTANTIATE_TEST_SUITE_P(Pulsar, LookupServiceTest, 
::testing::Values(binaryLookupUrl, httpLookupUrl));
 
 class BinaryProtoLookupServiceRedirectTestHelper : public 
BinaryProtoLookupService {
diff --git a/tests/PulsarWrapper.h b/tests/PulsarWrapper.h
new file mode 100644
index 0000000..87626e1
--- /dev/null
+++ b/tests/PulsarWrapper.h
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include "lib/ClientConfigurationImpl.h"
+#include "pulsar/ClientConfiguration.h"
+
+namespace pulsar {
+
+class PulsarWrapper {
+   public:
+    static ClientConfigurationImpl& deref(ClientConfiguration conf) { return 
*conf.impl_; }
+};
+
+}  // namespace pulsar
diff --git a/tests/RetryableOperationCacheTest.cc 
b/tests/RetryableOperationCacheTest.cc
index ea1eb69..2a6948e 100644
--- a/tests/RetryableOperationCacheTest.cc
+++ b/tests/RetryableOperationCacheTest.cc
@@ -19,6 +19,7 @@
 #include <gtest/gtest.h>
 
 #include <atomic>
+#include <chrono>
 #include <stdexcept>
 
 #include "lib/RetryableOperationCache.h"
@@ -82,7 +83,7 @@ class RetryableOperationCacheTest : public ::testing::Test {
 using namespace pulsar;
 
 TEST_F(RetryableOperationCacheTest, testRetry) {
-    auto cache = RetryableOperationCache<int>::create(provider_, 30 /* seconds 
*/);
+    auto cache = RetryableOperationCache<int>::create(provider_, 
std::chrono::seconds(30));
     for (int i = 0; i < 10; i++) {
         futures_.emplace_back(cache->run("key-" + std::to_string(i), 
CountdownFunc{i * 100}));
     }
@@ -94,7 +95,7 @@ TEST_F(RetryableOperationCacheTest, testRetry) {
 }
 
 TEST_F(RetryableOperationCacheTest, testCache) {
-    auto cache = RetryableOperationCache<int>::create(provider_, 30 /* seconds 
*/);
+    auto cache = RetryableOperationCache<int>::create(provider_, 
std::chrono::seconds(30));
     constexpr int numKeys = 5;
     for (int i = 0; i < 100; i++) {
         futures_.emplace_back(cache->run("key-" + std::to_string(i % numKeys), 
CountdownFunc{i * 100}));
@@ -107,7 +108,7 @@ TEST_F(RetryableOperationCacheTest, testCache) {
 }
 
 TEST_F(RetryableOperationCacheTest, testTimeout) {
-    auto cache = RetryableOperationCache<int>::create(provider_, 1 /* seconds 
*/);
+    auto cache = RetryableOperationCache<int>::create(provider_, 
std::chrono::seconds(1));
     auto future = cache->run("key", CountdownFunc{0, 1000 /* retry count */});
     try {
         wait(future);
@@ -118,7 +119,7 @@ TEST_F(RetryableOperationCacheTest, testTimeout) {
 }
 
 TEST_F(RetryableOperationCacheTest, testClear) {
-    auto cache = RetryableOperationCache<int>::create(provider_, 30 /* seconds 
*/);
+    auto cache = RetryableOperationCache<int>::create(provider_, 
std::chrono::seconds(30));
     for (int i = 0; i < 10; i++) {
         futures_.emplace_back(cache->run("key-" + std::to_string(i), 
CountdownFunc{100}));
     }

Reply via email to