This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 72b7311 Fix the operation timeout is not honored for GetSchema
requests (#383)
72b7311 is described below
commit 72b7311aeef32e28a28e926da686aaf948e8f948
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Jan 19 21:31:19 2024 +0800
Fix the operation timeout is not honored for GetSchema requests (#383)
---
lib/ClientConfiguration.cc | 7 +++++--
lib/ClientConfigurationImpl.h | 4 +++-
lib/ClientConnection.cc | 31 +++++++++++++++++++++++++---
lib/ClientConnection.h | 8 +++++++-
lib/ClientImpl.cc | 7 ++++++-
lib/ClientImpl.h | 2 ++
lib/Future.h | 2 +-
lib/RetryableLookupService.h | 12 ++++++-----
lib/RetryableOperation.h | 7 ++++---
lib/RetryableOperationCache.h | 9 ++++----
tests/LookupServiceTest.cc | 40 ++++++++++++++++++++++++++++++++----
tests/PulsarWrapper.h | 31 ++++++++++++++++++++++++++++
tests/RetryableOperationCacheTest.cc | 9 ++++----
13 files changed, 140 insertions(+), 29 deletions(-)
diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc
index 6e7c745..6a91ec1 100644
--- a/lib/ClientConfiguration.cc
+++ b/lib/ClientConfiguration.cc
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+#include <chrono>
#include <stdexcept>
#include "ClientConfigurationImpl.h"
@@ -61,11 +62,13 @@ Authentication& ClientConfiguration::getAuth() const {
return *impl_->authentica
const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return
impl_->authenticationPtr; }
ClientConfiguration& ClientConfiguration::setOperationTimeoutSeconds(int
timeout) {
- impl_->operationTimeoutSeconds = timeout;
+ impl_->operationTimeout = std::chrono::seconds(timeout);
return *this;
}
-int ClientConfiguration::getOperationTimeoutSeconds() const { return
impl_->operationTimeoutSeconds; }
+int ClientConfiguration::getOperationTimeoutSeconds() const {
+ return
std::chrono::duration_cast<std::chrono::seconds>(impl_->operationTimeout).count();
+}
ClientConfiguration& ClientConfiguration::setIOThreads(int threads) {
impl_->ioThreads = threads;
diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h
index b62b97c..08d4b6e 100644
--- a/lib/ClientConfigurationImpl.h
+++ b/lib/ClientConfigurationImpl.h
@@ -21,6 +21,8 @@
#include <pulsar/ClientConfiguration.h>
+#include <chrono>
+
namespace pulsar {
struct ClientConfigurationImpl {
@@ -28,7 +30,7 @@ struct ClientConfigurationImpl {
uint64_t memoryLimit{0ull};
int ioThreads{1};
int connectionsPerBroker{1};
- int operationTimeoutSeconds{30};
+ std::chrono::nanoseconds operationTimeout{30L * 1000 * 1000 * 1000};
int messageListenerThreads{1};
int concurrentLookupRequest{50000};
int maxLookupRedirects{20};
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 82ab492..b2916bd 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -24,6 +24,7 @@
#include <fstream>
#include "AsioDefines.h"
+#include "ClientImpl.h"
#include "Commands.h"
#include "ConnectionPool.h"
#include "ConsumerImpl.h"
@@ -163,7 +164,7 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
const ClientConfiguration&
clientConfiguration,
const AuthenticationPtr& authentication,
const std::string& clientVersion,
ConnectionPool& pool, size_t poolIndex)
- :
operationsTimeout_(std::chrono::seconds(clientConfiguration.getOperationTimeoutSeconds())),
+ : operationsTimeout_(ClientImpl::getOperationTimeout(clientConfiguration)),
authentication_(authentication),
serverProtocolVersion_(proto::ProtocolVersion_MIN),
executor_(executor),
@@ -1278,6 +1279,7 @@ void ClientConnection::close(Result result, bool detach) {
auto pendingConsumerStatsMap = std::move(pendingConsumerStatsMap_);
auto pendingGetLastMessageIdRequests =
std::move(pendingGetLastMessageIdRequests_);
auto pendingGetNamespaceTopicsRequests =
std::move(pendingGetNamespaceTopicsRequests_);
+ auto pendingGetSchemaRequests = std::move(pendingGetSchemaRequests_);
numOfPendingLookupRequest_ = 0;
@@ -1342,6 +1344,9 @@ void ClientConnection::close(Result result, bool detach) {
for (auto& kv : pendingGetNamespaceTopicsRequests) {
kv.second.setFailed(result);
}
+ for (auto& kv : pendingGetSchemaRequests) {
+ kv.second.promise.setFailed(result);
+ }
}
bool ClientConnection::isClosed() const { return state_ == Disconnected; }
@@ -1430,6 +1435,7 @@ Future<Result, NamespaceTopicsPtr>
ClientConnection::newGetTopicsOfNamespace(
Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string&
topicName,
const std::string&
version, uint64_t requestId) {
Lock lock(mutex_);
+
Promise<Result, SchemaInfo> promise;
if (isClosed()) {
lock.unlock();
@@ -1438,8 +1444,27 @@ Future<Result, SchemaInfo>
ClientConnection::newGetSchema(const std::string& top
return promise.getFuture();
}
- pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise));
+ auto timer = executor_->createDeadlineTimer();
+ pendingGetSchemaRequests_.emplace(requestId, GetSchemaRequest{promise,
timer});
lock.unlock();
+
+ auto weakSelf = weak_from_this();
+ timer->expires_from_now(operationsTimeout_);
+ timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+ Lock lock(mutex_);
+ auto it = pendingGetSchemaRequests_.find(requestId);
+ if (it != pendingGetSchemaRequests_.end()) {
+ auto promise = std::move(it->second.promise);
+ pendingGetSchemaRequests_.erase(it);
+ lock.unlock();
+ promise.setFailed(ResultTimeout);
+ }
+ });
+
sendCommand(Commands::newGetSchema(topicName, version, requestId));
return promise.getFuture();
}
@@ -1867,7 +1892,7 @@ void ClientConnection::handleGetSchemaResponse(const
proto::CommandGetSchemaResp
Lock lock(mutex_);
auto it = pendingGetSchemaRequests_.find(response.request_id());
if (it != pendingGetSchemaRequests_.end()) {
- Promise<Result, SchemaInfo> getSchemaPromise = it->second;
+ Promise<Result, SchemaInfo> getSchemaPromise = it->second.promise;
pendingGetSchemaRequests_.erase(it);
lock.unlock();
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 69155fd..1d44f05 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -42,6 +42,7 @@
#include <functional>
#include <memory>
#include <string>
+#include <unordered_map>
#include <vector>
#include "AsioTimer.h"
@@ -224,6 +225,11 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
DeadlineTimerPtr timer;
};
+ struct GetSchemaRequest {
+ Promise<Result, SchemaInfo> promise;
+ DeadlineTimerPtr timer;
+ };
+
/*
* handler for connectAsync
* creates a ConnectionPtr which has a valid ClientConnection object
@@ -363,7 +369,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>>
PendingGetNamespaceTopicsMap;
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
- typedef std::map<long, Promise<Result, SchemaInfo>> PendingGetSchemaMap;
+ typedef std::unordered_map<uint64_t, GetSchemaRequest> PendingGetSchemaMap;
PendingGetSchemaMap pendingGetSchemaRequests_;
mutable std::mutex mutex_;
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 2cfbd38..76d4389 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -21,6 +21,7 @@
#include <pulsar/ClientConfiguration.h>
#include <pulsar/Version.h>
+#include <chrono>
#include <random>
#include <sstream>
@@ -109,7 +110,7 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const
ClientConfiguration&
}
lookupServicePtr_ = RetryableLookupService::create(
- underlyingLookupServicePtr,
clientConfiguration_.getOperationTimeoutSeconds(), ioExecutorProvider_);
+ underlyingLookupServicePtr,
clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
}
ClientImpl::~ClientImpl() { shutdown(); }
@@ -768,4 +769,8 @@ std::string ClientImpl::getClientVersion(const
ClientConfiguration& clientConfig
return oss.str();
}
+std::chrono::nanoseconds ClientImpl::getOperationTimeout(const
ClientConfiguration& clientConfiguration) {
+ return clientConfiguration.impl_->operationTimeout;
+}
+
} /* namespace pulsar */
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index f7b5a89..762aa60 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -125,6 +125,8 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
ConnectionPool& getConnectionPool() noexcept { return pool_; }
+ static std::chrono::nanoseconds getOperationTimeout(const
ClientConfiguration& clientConfiguration);
+
friend class PulsarFriend;
private:
diff --git a/lib/Future.h b/lib/Future.h
index 290ebc6..69db74a 100644
--- a/lib/Future.h
+++ b/lib/Future.h
@@ -141,7 +141,7 @@ class Promise {
Future<Result, Type> getFuture() const { return Future<Result,
Type>{state_}; }
private:
- const InternalStatePtr<Result, Type> state_;
+ InternalStatePtr<Result, Type> state_;
};
} // namespace pulsar
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index b8e3e0d..561855f 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -18,6 +18,8 @@
*/
#pragma once
+#include <chrono>
+
#include "LookupDataResult.h"
#include "LookupService.h"
#include "NamespaceName.h"
@@ -81,15 +83,15 @@ class RetryableLookupService : public LookupService {
RetryableOperationCachePtr<NamespaceTopicsPtr> namespaceLookupCache_;
RetryableOperationCachePtr<SchemaInfo> getSchemaCache_;
- RetryableLookupService(std::shared_ptr<LookupService> lookupService, int
timeoutSeconds,
+ RetryableLookupService(std::shared_ptr<LookupService> lookupService,
TimeDuration timeout,
ExecutorServiceProviderPtr executorProvider)
: lookupService_(lookupService),
-
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider,
timeoutSeconds)),
+
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider,
timeout)),
partitionLookupCache_(
-
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider,
timeoutSeconds)),
+
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider,
timeout)),
namespaceLookupCache_(
-
RetryableOperationCache<NamespaceTopicsPtr>::create(executorProvider,
timeoutSeconds)),
-
getSchemaCache_(RetryableOperationCache<SchemaInfo>::create(executorProvider,
timeoutSeconds)) {}
+
RetryableOperationCache<NamespaceTopicsPtr>::create(executorProvider, timeout)),
+
getSchemaCache_(RetryableOperationCache<SchemaInfo>::create(executorProvider,
timeout)) {}
};
} // namespace pulsar
diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h
index 9c920da..dba190f 100644
--- a/lib/RetryableOperation.h
+++ b/lib/RetryableOperation.h
@@ -22,6 +22,7 @@
#include <algorithm>
#include <atomic>
+#include <chrono>
#include <functional>
#include <memory>
@@ -40,11 +41,11 @@ class RetryableOperation : public
std::enable_shared_from_this<RetryableOperatio
explicit PassKey() {}
};
- RetryableOperation(const std::string& name, std::function<Future<Result,
T>()>&& func, int timeoutSeconds,
- DeadlineTimerPtr timer)
+ RetryableOperation(const std::string& name, std::function<Future<Result,
T>()>&& func,
+ TimeDuration timeout, DeadlineTimerPtr timer)
: name_(name),
func_(std::move(func)),
- timeout_(std::chrono::seconds(timeoutSeconds)),
+ timeout_(timeout),
backoff_(std::chrono::milliseconds(100), timeout_ + timeout_,
std::chrono::milliseconds(0)),
timer_(timer) {}
diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h
index 70fa914..e42460d 100644
--- a/lib/RetryableOperationCache.h
+++ b/lib/RetryableOperationCache.h
@@ -18,6 +18,7 @@
*/
#pragma once
+#include <chrono>
#include <mutex>
#include <unordered_map>
@@ -40,8 +41,8 @@ class RetryableOperationCache : public
std::enable_shared_from_this<RetryableOpe
explicit PassKey() {}
};
- RetryableOperationCache(ExecutorServiceProviderPtr executorProvider, int
timeoutSeconds)
- : executorProvider_(executorProvider), timeoutSeconds_(timeoutSeconds)
{}
+ RetryableOperationCache(ExecutorServiceProviderPtr executorProvider,
TimeDuration timeout)
+ : executorProvider_(executorProvider), timeout_(timeout) {}
using Self = RetryableOperationCache<T>;
@@ -69,7 +70,7 @@ class RetryableOperationCache : public
std::enable_shared_from_this<RetryableOpe
return promise.getFuture();
}
- auto operation = RetryableOperation<T>::create(key,
std::move(func), timeoutSeconds_, timer);
+ auto operation = RetryableOperation<T>::create(key,
std::move(func), timeout_, timer);
auto future = operation->run();
operations_[key] = operation;
lock.unlock();
@@ -106,7 +107,7 @@ class RetryableOperationCache : public
std::enable_shared_from_this<RetryableOpe
private:
ExecutorServiceProviderPtr executorProvider_;
- const int timeoutSeconds_;
+ const TimeDuration timeout_;
std::unordered_map<std::string, std::shared_ptr<RetryableOperation<T>>>
operations_;
mutable std::mutex mutex_;
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index 3457bd4..0fe1385 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -22,11 +22,14 @@
#include <algorithm>
#include <boost/exception/all.hpp>
+#include <chrono>
#include <future>
+#include <memory>
#include <stdexcept>
#include "HttpHelper.h"
#include "PulsarFriend.h"
+#include "PulsarWrapper.h"
#include "lib/BinaryProtoLookupService.h"
#include "lib/ClientConnection.h"
#include "lib/ConnectionPool.h"
@@ -48,7 +51,10 @@ namespace pulsar {
class LookupServiceTest : public ::testing::TestWithParam<std::string> {
public:
- void SetUp() override { client_ = Client{GetParam()}; }
+ void SetUp() override {
+ serviceUrl_ = GetParam();
+ client_ = Client{serviceUrl_};
+ }
void TearDown() override { client_.close(); }
template <typename T>
@@ -63,6 +69,7 @@ class LookupServiceTest : public
::testing::TestWithParam<std::string> {
}
protected:
+ std::string serviceUrl_;
Client client_{httpLookupUrl};
};
@@ -159,7 +166,7 @@ TEST(LookupServiceTest, testRetry) {
ClientConfiguration conf;
auto lookupService = RetryableLookupService::create(
- std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool,
conf), 30 /* seconds */,
+ std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool,
conf), std::chrono::seconds(30),
executorProvider);
PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
@@ -194,8 +201,8 @@ TEST(LookupServiceTest, testTimeout) {
constexpr int timeoutInSeconds = 2;
auto lookupService = RetryableLookupService::create(
- std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool,
conf), timeoutInSeconds,
- executorProvider);
+ std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool,
conf),
+ std::chrono::seconds(timeoutInSeconds), executorProvider);
auto topicNamePtr = TopicName::get("lookup-service-test-retry");
decltype(std::chrono::high_resolution_clock::now()) startTime;
@@ -431,6 +438,31 @@ TEST_P(LookupServiceTest, testGetSchemaByVersion) {
producer2.close();
}
+TEST_P(LookupServiceTest, testGetSchemaTimeout) {
+ if (serviceUrl_.find("pulsar://") == std::string::npos) {
+ // HTTP request timeout can only be configured with seconds
+ return;
+ }
+ const auto topic = "lookup-service-test-get-schema-timeout";
+ Producer producer;
+ ProducerConfiguration producerConf;
+ producerConf.setSchema(SchemaInfo(STRING, "", ""));
+ ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConf, producer));
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent("msg").build()));
+ client_.close();
+
+ ClientConfiguration conf;
+ PulsarWrapper::deref(conf).operationTimeout = std::chrono::nanoseconds(1);
+ client_ = Client{serviceUrl_, conf};
+ auto promise = std::make_shared<std::promise<Result>>();
+ client_.getSchemaInfoAsync(topic, 0L,
+ [promise](Result result, const SchemaInfo&) {
promise->set_value(result); });
+ auto future = promise->get_future();
+ ASSERT_EQ(future.wait_for(std::chrono::milliseconds(100)),
std::future_status::ready);
+ ASSERT_EQ(future.get(), ResultTimeout);
+ client_.close();
+}
+
INSTANTIATE_TEST_SUITE_P(Pulsar, LookupServiceTest,
::testing::Values(binaryLookupUrl, httpLookupUrl));
class BinaryProtoLookupServiceRedirectTestHelper : public
BinaryProtoLookupService {
diff --git a/tests/PulsarWrapper.h b/tests/PulsarWrapper.h
new file mode 100644
index 0000000..87626e1
--- /dev/null
+++ b/tests/PulsarWrapper.h
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include "lib/ClientConfigurationImpl.h"
+#include "pulsar/ClientConfiguration.h"
+
+namespace pulsar {
+
+class PulsarWrapper {
+ public:
+ static ClientConfigurationImpl& deref(ClientConfiguration conf) { return
*conf.impl_; }
+};
+
+} // namespace pulsar
diff --git a/tests/RetryableOperationCacheTest.cc
b/tests/RetryableOperationCacheTest.cc
index ea1eb69..2a6948e 100644
--- a/tests/RetryableOperationCacheTest.cc
+++ b/tests/RetryableOperationCacheTest.cc
@@ -19,6 +19,7 @@
#include <gtest/gtest.h>
#include <atomic>
+#include <chrono>
#include <stdexcept>
#include "lib/RetryableOperationCache.h"
@@ -82,7 +83,7 @@ class RetryableOperationCacheTest : public ::testing::Test {
using namespace pulsar;
TEST_F(RetryableOperationCacheTest, testRetry) {
- auto cache = RetryableOperationCache<int>::create(provider_, 30 /* seconds
*/);
+ auto cache = RetryableOperationCache<int>::create(provider_,
std::chrono::seconds(30));
for (int i = 0; i < 10; i++) {
futures_.emplace_back(cache->run("key-" + std::to_string(i),
CountdownFunc{i * 100}));
}
@@ -94,7 +95,7 @@ TEST_F(RetryableOperationCacheTest, testRetry) {
}
TEST_F(RetryableOperationCacheTest, testCache) {
- auto cache = RetryableOperationCache<int>::create(provider_, 30 /* seconds
*/);
+ auto cache = RetryableOperationCache<int>::create(provider_,
std::chrono::seconds(30));
constexpr int numKeys = 5;
for (int i = 0; i < 100; i++) {
futures_.emplace_back(cache->run("key-" + std::to_string(i % numKeys),
CountdownFunc{i * 100}));
@@ -107,7 +108,7 @@ TEST_F(RetryableOperationCacheTest, testCache) {
}
TEST_F(RetryableOperationCacheTest, testTimeout) {
- auto cache = RetryableOperationCache<int>::create(provider_, 1 /* seconds
*/);
+ auto cache = RetryableOperationCache<int>::create(provider_,
std::chrono::seconds(1));
auto future = cache->run("key", CountdownFunc{0, 1000 /* retry count */});
try {
wait(future);
@@ -118,7 +119,7 @@ TEST_F(RetryableOperationCacheTest, testTimeout) {
}
TEST_F(RetryableOperationCacheTest, testClear) {
- auto cache = RetryableOperationCache<int>::create(provider_, 30 /* seconds
*/);
+ auto cache = RetryableOperationCache<int>::create(provider_,
std::chrono::seconds(30));
for (int i = 0; i < 10; i++) {
futures_.emplace_back(cache->run("key-" + std::to_string(i),
CountdownFunc{100}));
}