This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new e737716 Fix the wrong backoff computation when retrying (#296)
e737716 is described below
commit e73771694b612c92581243d2a715f9aae0ed2f7a
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Jul 7 16:23:01 2023 +0800
Fix the wrong backoff computation when retrying (#296)
### Motivation
All the retryable operations share the same `Backoff` object in
`RetryableLookupService`, so if the reconnection happens for some times,
the delay of retrying will keeps the maximum value (30 seconds).
### Modifications
Refactor the design of the `RetryableLookupService`:
- Add a `RetryableOperation` class to represent a retryable operation,
each instance has its own `Backoff` object. The operation could only
be executed once.
- Add a `RetryableOperationCache` class to represent a map that maps a
specific name to its associated operation. It's an optimization that
if an operation (e.g. find the owner topic of topic A) was not
complete while the same operation was executed, the future would be
reused.
- In `RetryableLookupService`, just maintain some caches for different
operations.
- Add `RetryableOperationCacheTest` to verify the behaviors.
---
lib/ClientImpl.cc | 1 +
lib/LookupService.h | 2 +
lib/RetryableLookupService.h | 123 +++++++-------------------------
lib/RetryableOperation.h | 134 +++++++++++++++++++++++++++++++++++
lib/RetryableOperationCache.h | 117 ++++++++++++++++++++++++++++++
tests/LookupServiceTest.cc | 59 ++++++++-------
tests/PulsarFriend.h | 5 --
tests/RetryableOperationCacheTest.cc | 134 +++++++++++++++++++++++++++++++++++
tests/TableViewTest.cc | 2 +
9 files changed, 444 insertions(+), 133 deletions(-)
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index c94434e..762d2b8 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -598,6 +598,7 @@ void ClientImpl::closeAsync(CloseCallback callback) {
state_ = Closing;
memoryLimitController_.close();
+ lookupServicePtr_->close();
auto producers = producers_.move();
auto consumers = consumers_.move();
diff --git a/lib/LookupService.h b/lib/LookupService.h
index 7401121..35f4730 100644
--- a/lib/LookupService.h
+++ b/lib/LookupService.h
@@ -86,6 +86,8 @@ class LookupService {
const std::string& version =
"") = 0;
virtual ~LookupService() {}
+
+ virtual void close() {}
};
typedef std::shared_ptr<LookupService> LookupServicePtr;
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index c8fdaab..b8e3e0d 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -18,23 +18,17 @@
*/
#pragma once
-#include <algorithm>
-#include <memory>
-
-#include "Backoff.h"
-#include "ExecutorService.h"
-#include "LogUtils.h"
#include "LookupDataResult.h"
#include "LookupService.h"
-#include "SynchronizedHashMap.h"
+#include "NamespaceName.h"
+#include "RetryableOperationCache.h"
#include "TopicName.h"
namespace pulsar {
-class RetryableLookupService : public LookupService,
- public
std::enable_shared_from_this<RetryableLookupService> {
+class RetryableLookupService : public LookupService {
private:
- friend class PulsarFriend;
+ friend class LookupServiceTest;
struct PassKey {
explicit PassKey() {}
};
@@ -44,123 +38,58 @@ class RetryableLookupService : public LookupService,
explicit RetryableLookupService(PassKey, Args&&... args)
: RetryableLookupService(std::forward<Args>(args)...) {}
+ void close() override {
+ lookupCache_->clear();
+ partitionLookupCache_->clear();
+ namespaceLookupCache_->clear();
+ getSchemaCache_->clear();
+ }
+
template <typename... Args>
static std::shared_ptr<RetryableLookupService> create(Args&&... args) {
return std::make_shared<RetryableLookupService>(PassKey{},
std::forward<Args>(args)...);
}
LookupResultFuture getBroker(const TopicName& topicName) override {
- return executeAsync<LookupResult>("get-broker-" + topicName.toString(),
- [this, topicName] { return
lookupService_->getBroker(topicName); });
+ return lookupCache_->run("get-broker-" + topicName.toString(),
+ [this, topicName] { return
lookupService_->getBroker(topicName); });
}
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr& topicName) override {
- return executeAsync<LookupDataResultPtr>(
+ return partitionLookupCache_->run(
"get-partition-metadata-" + topicName->toString(),
[this, topicName] { return
lookupService_->getPartitionMetadataAsync(topicName); });
}
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode)
override {
- return executeAsync<NamespaceTopicsPtr>(
+ return namespaceLookupCache_->run(
"get-topics-of-namespace-" + nsName->toString(),
[this, nsName, mode] { return
lookupService_->getTopicsOfNamespaceAsync(nsName, mode); });
}
Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const
std::string& version) override {
- return executeAsync<SchemaInfo>("get-schema" + topicName->toString(),
[this, topicName, version] {
+ return getSchemaCache_->run("get-schema" + topicName->toString(),
[this, topicName, version] {
return lookupService_->getSchema(topicName, version);
});
}
- template <typename T>
- Future<Result, T> executeAsync(const std::string& key,
std::function<Future<Result, T>()> f) {
- Promise<Result, T> promise;
- executeAsyncImpl(key, f, promise, timeout_);
- return promise.getFuture();
- }
-
private:
const std::shared_ptr<LookupService> lookupService_;
- const TimeDuration timeout_;
- Backoff backoff_;
- const ExecutorServiceProviderPtr executorProvider_;
-
- SynchronizedHashMap<std::string, DeadlineTimerPtr> backoffTimers_;
+ RetryableOperationCachePtr<LookupResult> lookupCache_;
+ RetryableOperationCachePtr<LookupDataResultPtr> partitionLookupCache_;
+ RetryableOperationCachePtr<NamespaceTopicsPtr> namespaceLookupCache_;
+ RetryableOperationCachePtr<SchemaInfo> getSchemaCache_;
RetryableLookupService(std::shared_ptr<LookupService> lookupService, int
timeoutSeconds,
ExecutorServiceProviderPtr executorProvider)
: lookupService_(lookupService),
- timeout_(boost::posix_time::seconds(timeoutSeconds)),
- backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_,
- boost::posix_time::milliseconds(0)),
- executorProvider_(executorProvider) {}
-
- std::weak_ptr<RetryableLookupService> weak_from_this() noexcept { return
shared_from_this(); }
-
- // NOTE: Set the visibility to fix compilation error in GCC 6
- template <typename T>
-#ifndef _WIN32
- __attribute__((visibility("hidden")))
-#endif
- void
- executeAsyncImpl(const std::string& key, std::function<Future<Result,
T>()> f, Promise<Result, T> promise,
- TimeDuration remainingTime) {
- auto weakSelf = weak_from_this();
- f().addListener([this, weakSelf, key, f, promise,
remainingTime](Result result, const T& value) {
- auto self = weakSelf.lock();
- if (!self) {
- return;
- }
-
- if (result == ResultOk) {
- backoffTimers_.remove(key);
- promise.setValue(value);
- } else if (result == ResultRetryable) {
- if (remainingTime.total_milliseconds() <= 0) {
- backoffTimers_.remove(key);
- promise.setFailed(ResultTimeout);
- return;
- }
-
- DeadlineTimerPtr timerPtr;
- try {
- timerPtr = executorProvider_->get()->createDeadlineTimer();
- } catch (const std::runtime_error& e) {
- LOG_ERROR("Failed to retry lookup for " << key << ": " <<
e.what());
- promise.setFailed(ResultConnectError);
- return;
- }
- auto it = backoffTimers_.emplace(key, timerPtr);
- auto& timer = *(it.first->second);
- auto delay = std::min(backoff_.next(), remainingTime);
- timer.expires_from_now(delay);
-
- auto nextRemainingTime = remainingTime - delay;
- LOG_INFO("Reschedule " << key << " for " <<
delay.total_milliseconds()
- << " ms, remaining time: " <<
nextRemainingTime.total_milliseconds()
- << " ms");
- timer.async_wait([this, weakSelf, key, f, promise,
- nextRemainingTime](const
boost::system::error_code& ec) {
- auto self = weakSelf.lock();
- if (!self || ec) {
- if (self && ec !=
boost::asio::error::operation_aborted) {
- LOG_ERROR("The timer for " << key << " failed: "
<< ec.message());
- }
- // The lookup service has been destructed or the timer
has been cancelled
- promise.setFailed(ResultTimeout);
- return;
- }
- executeAsyncImpl(key, f, promise, nextRemainingTime);
- });
- } else {
- backoffTimers_.remove(key);
- promise.setFailed(result);
- }
- });
- }
-
- DECLARE_LOG_OBJECT()
+
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider,
timeoutSeconds)),
+ partitionLookupCache_(
+
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider,
timeoutSeconds)),
+ namespaceLookupCache_(
+
RetryableOperationCache<NamespaceTopicsPtr>::create(executorProvider,
timeoutSeconds)),
+
getSchemaCache_(RetryableOperationCache<SchemaInfo>::create(executorProvider,
timeoutSeconds)) {}
};
} // namespace pulsar
diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h
new file mode 100644
index 0000000..8a7b471
--- /dev/null
+++ b/lib/RetryableOperation.h
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/Result.h>
+
+#include <algorithm>
+#include <atomic>
+#include <functional>
+#include <memory>
+
+#include "Backoff.h"
+#include "ExecutorService.h"
+#include "Future.h"
+#include "LogUtils.h"
+
+namespace pulsar {
+
+template <typename T>
+class RetryableOperation : public
std::enable_shared_from_this<RetryableOperation<T>> {
+ struct PassKey {
+ explicit PassKey() {}
+ };
+
+ RetryableOperation(const std::string& name, std::function<Future<Result,
T>()>&& func, int timeoutSeconds,
+ DeadlineTimerPtr timer)
+ : name_(name),
+ func_(std::move(func)),
+ timeout_(boost::posix_time::seconds(timeoutSeconds)),
+ backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_,
+ boost::posix_time::milliseconds(0)),
+ timer_(timer) {}
+
+ public:
+ template <typename... Args>
+ explicit RetryableOperation(PassKey, Args&&... args) :
RetryableOperation(std::forward<Args>(args)...) {}
+
+ template <typename... Args>
+ static std::shared_ptr<RetryableOperation<T>> create(Args&&... args) {
+ return std::make_shared<RetryableOperation<T>>(PassKey{},
std::forward<Args>(args)...);
+ }
+
+ Future<Result, T> run() {
+ bool expected = false;
+ if (!started_.compare_exchange_strong(expected, true)) {
+ return promise_.getFuture();
+ }
+ return runImpl(timeout_);
+ }
+
+ void cancel() {
+ promise_.setFailed(ResultDisconnected);
+ boost::system::error_code ec;
+ timer_->cancel(ec);
+ }
+
+ private:
+ const std::string name_;
+ std::function<Future<Result, T>()> func_;
+ const TimeDuration timeout_;
+ Backoff backoff_;
+ Promise<Result, T> promise_;
+ std::atomic_bool started_{false};
+ DeadlineTimerPtr timer_;
+
+ Future<Result, T> runImpl(TimeDuration remainingTime) {
+ std::weak_ptr<RetryableOperation<T>>
weakSelf{this->shared_from_this()};
+ func_().addListener([this, weakSelf, remainingTime](Result result,
const T& value) {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+ if (result == ResultOk) {
+ promise_.setValue(value);
+ return;
+ }
+ if (result != ResultRetryable) {
+ promise_.setFailed(result);
+ return;
+ }
+ if (remainingTime.total_milliseconds() <= 0) {
+ promise_.setFailed(ResultTimeout);
+ return;
+ }
+
+ auto delay = std::min(backoff_.next(), remainingTime);
+ timer_->expires_from_now(delay);
+
+ auto nextRemainingTime = remainingTime - delay;
+ LOG_INFO("Reschedule " << name_ << " for " <<
delay.total_milliseconds()
+ << " ms, remaining time: " <<
nextRemainingTime.total_milliseconds()
+ << " ms");
+ timer_->async_wait([this, weakSelf, nextRemainingTime](const
boost::system::error_code& ec) {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+ if (ec) {
+ if (ec == boost::asio::error::operation_aborted) {
+ LOG_DEBUG("Timer for " << name_ << " is cancelled");
+ promise_.setFailed(ResultTimeout);
+ } else {
+ LOG_WARN("Timer for " << name_ << " failed: " <<
ec.message());
+ }
+ } else {
+ LOG_DEBUG("Run operation " << name_ << ", remaining time: "
+ <<
nextRemainingTime.total_milliseconds() << " ms");
+ runImpl(nextRemainingTime);
+ }
+ });
+ });
+ return promise_.getFuture();
+ }
+
+ DECLARE_LOG_OBJECT()
+};
+
+} // namespace pulsar
diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h
new file mode 100644
index 0000000..70fa914
--- /dev/null
+++ b/lib/RetryableOperationCache.h
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <mutex>
+#include <unordered_map>
+
+#include "ExecutorService.h"
+#include "RetryableOperation.h"
+
+namespace pulsar {
+
+template <typename T>
+class RetryableOperationCache;
+
+template <typename T>
+using RetryableOperationCachePtr = std::shared_ptr<RetryableOperationCache<T>>;
+
+template <typename T>
+class RetryableOperationCache : public
std::enable_shared_from_this<RetryableOperationCache<T>> {
+ friend class LookupServiceTest;
+ friend class RetryableOperationCacheTest;
+ struct PassKey {
+ explicit PassKey() {}
+ };
+
+ RetryableOperationCache(ExecutorServiceProviderPtr executorProvider, int
timeoutSeconds)
+ : executorProvider_(executorProvider), timeoutSeconds_(timeoutSeconds)
{}
+
+ using Self = RetryableOperationCache<T>;
+
+ public:
+ template <typename... Args>
+ explicit RetryableOperationCache(PassKey, Args&&... args)
+ : RetryableOperationCache(std::forward<Args>(args)...) {}
+
+ template <typename... Args>
+ static std::shared_ptr<Self> create(Args&&... args) {
+ return std::make_shared<Self>(PassKey{}, std::forward<Args>(args)...);
+ }
+
+ Future<Result, T> run(const std::string& key, std::function<Future<Result,
T>()>&& func) {
+ std::unique_lock<std::mutex> lock{mutex_};
+ auto it = operations_.find(key);
+ if (it == operations_.end()) {
+ DeadlineTimerPtr timer;
+ try {
+ timer = executorProvider_->get()->createDeadlineTimer();
+ } catch (const std::runtime_error& e) {
+ LOG_ERROR("Failed to retry lookup for " << key << ": " <<
e.what());
+ Promise<Result, T> promise;
+ promise.setFailed(ResultConnectError);
+ return promise.getFuture();
+ }
+
+ auto operation = RetryableOperation<T>::create(key,
std::move(func), timeoutSeconds_, timer);
+ auto future = operation->run();
+ operations_[key] = operation;
+ lock.unlock();
+
+ std::weak_ptr<Self> weakSelf{this->shared_from_this()};
+ future.addListener([this, weakSelf, key, operation](Result, const
T&) {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+ std::lock_guard<std::mutex> lock{mutex_};
+ operations_.erase(key);
+ operation->cancel();
+ });
+
+ return future;
+ } else {
+ return it->second->run();
+ }
+ }
+
+ void clear() {
+ decltype(operations_) operations;
+ {
+ std::lock_guard<std::mutex> lock{mutex_};
+ operations.swap(operations_);
+ }
+ // cancel() could trigger the listener to erase the key from
operations, so we should use a swap way
+ // to release the lock here
+ for (auto&& kv : operations) {
+ kv.second->cancel();
+ }
+ }
+
+ private:
+ ExecutorServiceProviderPtr executorProvider_;
+ const int timeoutSeconds_;
+
+ std::unordered_map<std::string, std::shared_ptr<RetryableOperation<T>>>
operations_;
+ mutable std::mutex mutex_;
+
+ DECLARE_LOG_OBJECT()
+};
+
+} // namespace pulsar
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index 7712c4e..aff8409 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -37,8 +37,6 @@
#include "lib/RetryableLookupService.h"
#include "lib/TimeUtils.h"
-using namespace pulsar;
-
DECLARE_LOG_OBJECT()
static std::string binaryLookupUrl = "pulsar://localhost:6650";
@@ -46,6 +44,32 @@ static std::string httpLookupUrl = "http://localhost:8080";
extern std::string unique_str();
+namespace pulsar {
+
+class LookupServiceTest : public ::testing::TestWithParam<std::string> {
+ public:
+ void SetUp() override { client_ = Client{GetParam()}; }
+ void TearDown() override { client_.close(); }
+
+ template <typename T>
+ static bool isEmpty(const RetryableOperationCache<T>& cache) {
+ std::lock_guard<std::mutex> lock{cache.mutex_};
+ return cache.operations_.empty();
+ }
+
+ static size_t isEmpty(const RetryableLookupService& service) {
+ return isEmpty(*service.lookupCache_) &&
isEmpty(*service.partitionLookupCache_) &&
+ isEmpty(*service.namespaceLookupCache_) &&
isEmpty(*service.getSchemaCache_);
+ }
+
+ protected:
+ Client client_{httpLookupUrl};
+};
+
+} // namespace pulsar
+
+using namespace pulsar;
+
TEST(LookupServiceTest, basicLookup) {
ExecutorServiceProviderPtr service =
std::make_shared<ExecutorServiceProvider>(1);
AuthenticationPtr authData = AuthFactory::Disabled();
@@ -159,25 +183,7 @@ TEST(LookupServiceTest, testRetry) {
ASSERT_EQ(ResultOk, future3.get(namespaceTopicsPtr));
LOG_INFO("getTopicPartitionName Async returns " <<
namespaceTopicsPtr->size() << " topics");
- std::atomic_int retryCount{0};
- constexpr int totalRetryCount = 3;
- auto future4 = lookupService->executeAsync<int>("key", [&retryCount]() ->
Future<Result, int> {
- Promise<Result, int> promise;
- if (++retryCount < totalRetryCount) {
- LOG_INFO("Retry count: " << retryCount);
- promise.setFailed(ResultRetryable);
- } else {
- LOG_INFO("Retry done with " << retryCount << " times");
- promise.setValue(100);
- }
- return promise.getFuture();
- });
- int customResult = 0;
- ASSERT_EQ(ResultOk, future4.get(customResult));
- ASSERT_EQ(customResult, 100);
- ASSERT_EQ(retryCount.load(), totalRetryCount);
-
- ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0);
+ ASSERT_TRUE(LookupServiceTest::isEmpty(*lookupService));
}
TEST(LookupServiceTest, testTimeout) {
@@ -221,18 +227,9 @@ TEST(LookupServiceTest, testTimeout) {
ASSERT_EQ(ResultTimeout, future3.get(namespaceTopicsPtr));
afterMethod("getTopicsOfNamespaceAsync");
- ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0);
+ ASSERT_TRUE(LookupServiceTest::isEmpty(*lookupService));
}
-class LookupServiceTest : public ::testing::TestWithParam<std::string> {
- public:
- void SetUp() override { client_ = Client{GetParam()}; }
- void TearDown() override { client_.close(); }
-
- protected:
- Client client_{httpLookupUrl};
-};
-
TEST_P(LookupServiceTest, basicGetNamespaceTopics) {
Result result;
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index 5a3f820..acbbf16 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -31,7 +31,6 @@
#include "lib/PartitionedProducerImpl.h"
#include "lib/ProducerImpl.h"
#include "lib/ReaderImpl.h"
-#include "lib/RetryableLookupService.h"
#include "lib/stats/ConsumerStatsImpl.h"
#include "lib/stats/ProducerStatsImpl.h"
@@ -181,10 +180,6 @@ class PulsarFriend {
setServiceUrlIndex(client.impl_->serviceNameResolver_, index);
}
- static size_t getNumberOfPendingTasks(const RetryableLookupService&
lookupService) {
- return lookupService.backoffTimers_.size();
- }
-
static proto::MessageMetadata& getMessageMetadata(Message& message) {
return message.impl_->metadata; }
static std::shared_ptr<MessageIdImpl> getMessageIdImpl(MessageId& msgId) {
return msgId.impl_; }
diff --git a/tests/RetryableOperationCacheTest.cc
b/tests/RetryableOperationCacheTest.cc
new file mode 100644
index 0000000..ea1eb69
--- /dev/null
+++ b/tests/RetryableOperationCacheTest.cc
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+
+#include <atomic>
+#include <stdexcept>
+
+#include "lib/RetryableOperationCache.h"
+
+namespace pulsar {
+
+using IntFuture = Future<Result, int>;
+
+static int wait(IntFuture future) {
+ int value;
+ auto result = future.get(value);
+ if (result != ResultOk) {
+ throw std::runtime_error(strResult(result));
+ }
+ return value;
+}
+
+class CountdownFunc {
+ const int result_;
+ const int totalRetryCount_ = 3;
+ std::atomic_int current_{0};
+
+ public:
+ CountdownFunc(int result, int totalRetryCount = 3) : result_(result),
totalRetryCount_(totalRetryCount) {}
+
+ CountdownFunc(const CountdownFunc& rhs)
+ : result_(rhs.result_), totalRetryCount_(rhs.totalRetryCount_),
current_(rhs.current_.load()) {}
+
+ IntFuture operator()() {
+ Promise<Result, int> promise;
+ if (++current_ < totalRetryCount_) {
+ promise.setFailed(ResultRetryable);
+ } else {
+ promise.setValue(result_);
+ }
+ return promise.getFuture();
+ }
+};
+
+class RetryableOperationCacheTest : public ::testing::Test {
+ protected:
+ void SetUp() override { provider_ =
std::make_shared<ExecutorServiceProvider>(1); }
+
+ void TearDown() override {
+ provider_->close();
+ futures_.clear();
+ }
+
+ template <typename T>
+ size_t getSize(const RetryableOperationCache<T>& cache) {
+ std::lock_guard<std::mutex> lock{cache.mutex_};
+ return cache.operations_.size();
+ }
+
+ ExecutorServiceProviderPtr provider_;
+ std::vector<IntFuture> futures_;
+};
+
+} // namespace pulsar
+
+using namespace pulsar;
+
+TEST_F(RetryableOperationCacheTest, testRetry) {
+ auto cache = RetryableOperationCache<int>::create(provider_, 30 /* seconds
*/);
+ for (int i = 0; i < 10; i++) {
+ futures_.emplace_back(cache->run("key-" + std::to_string(i),
CountdownFunc{i * 100}));
+ }
+ ASSERT_EQ(getSize(*cache), 10);
+ for (int i = 0; i < 10; i++) {
+ ASSERT_EQ(wait(futures_[i]), i * 100);
+ }
+ ASSERT_EQ(getSize(*cache), 0);
+}
+
+TEST_F(RetryableOperationCacheTest, testCache) {
+ auto cache = RetryableOperationCache<int>::create(provider_, 30 /* seconds
*/);
+ constexpr int numKeys = 5;
+ for (int i = 0; i < 100; i++) {
+ futures_.emplace_back(cache->run("key-" + std::to_string(i % numKeys),
CountdownFunc{i * 100}));
+ }
+ ASSERT_EQ(getSize(*cache), numKeys);
+ for (int i = 0; i < 100; i++) {
+ ASSERT_EQ(wait(futures_[i]), (i % numKeys) * 100);
+ }
+ ASSERT_EQ(getSize(*cache), 0);
+}
+
+TEST_F(RetryableOperationCacheTest, testTimeout) {
+ auto cache = RetryableOperationCache<int>::create(provider_, 1 /* seconds
*/);
+ auto future = cache->run("key", CountdownFunc{0, 1000 /* retry count */});
+ try {
+ wait(future);
+ FAIL();
+ } catch (const std::runtime_error& e) {
+ ASSERT_STREQ(e.what(), strResult(ResultTimeout));
+ }
+}
+
+TEST_F(RetryableOperationCacheTest, testClear) {
+ auto cache = RetryableOperationCache<int>::create(provider_, 30 /* seconds
*/);
+ for (int i = 0; i < 10; i++) {
+ futures_.emplace_back(cache->run("key-" + std::to_string(i),
CountdownFunc{100}));
+ }
+ ASSERT_EQ(getSize(*cache), 10);
+ cache->clear();
+ for (auto&& future : futures_) {
+ int value;
+ // All cancelled futures complete with ResultDisconnected and the
default int value
+ ASSERT_EQ(ResultDisconnected, future.get(value));
+ ASSERT_EQ(value, 0);
+ }
+ ASSERT_EQ(getSize(*cache), 0);
+}
diff --git a/tests/TableViewTest.cc b/tests/TableViewTest.cc
index 5d42173..a645b9a 100644
--- a/tests/TableViewTest.cc
+++ b/tests/TableViewTest.cc
@@ -23,7 +23,9 @@
#include <future>
#include "HttpHelper.h"
+#include "LogUtils.h"
#include "PulsarFriend.h"
+#include "TopicName.h"
#include "WaitUtils.h"
using namespace pulsar;