This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 40259cc PIP-121: Implement AutoClusterFailover (#547)
40259cc is described below
commit 40259cc7afb788ea4d27291f2df134a752c1a714
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Mar 17 19:18:04 2026 +0800
PIP-121: Implement AutoClusterFailover (#547)
---
include/pulsar/AutoClusterFailover.h | 116 ++++++++++
lib/AutoClusterFailover.cc | 418 +++++++++++++++++++++++++++++++++++
tests/ServiceInfoProviderTest.cc | 218 ++++++++++++++++++
3 files changed, 752 insertions(+)
diff --git a/include/pulsar/AutoClusterFailover.h
b/include/pulsar/AutoClusterFailover.h
new file mode 100644
index 0000000..a9d7442
--- /dev/null
+++ b/include/pulsar/AutoClusterFailover.h
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_AUTO_CLUSTER_FAILOVER_H_
+#define PULSAR_AUTO_CLUSTER_FAILOVER_H_
+
+#include <pulsar/ServiceInfoProvider.h>
+
+#include <chrono>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <vector>
+
+namespace pulsar {
+
+class Client;
+class AutoClusterFailoverImpl;
+
+class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider {
+ public:
+ struct Config {
+ const ServiceInfo primary;
+ const std::vector<ServiceInfo> secondary;
+ std::chrono::milliseconds checkInterval{5000}; // 5 seconds
+ uint32_t failoverThreshold{1};
+ uint32_t switchBackThreshold{1};
+
+ Config(ServiceInfo primary, std::vector<ServiceInfo> secondary)
+ : primary(std::move(primary)), secondary(std::move(secondary)) {}
+ };
+
+ /**
+ * Builder helps create an AutoClusterFailover configuration.
+ *
+ * Example:
+ * ServiceInfo primary{...};
+ * std::vector<ServiceInfo> secondaries{...};
+ * AutoClusterFailover provider = AutoClusterFailover::Builder(primary,
secondaries)
+ * .withCheckInterval(std::chrono::seconds(5))
+ * .withFailoverThreshold(3)
+ * .withSwitchBackThreshold(3)
+ * .build();
+ *
+ * Notes:
+ * - primary: the preferred cluster to use when available.
+ * - secondary: ordered list of fallback clusters.
+ * - checkInterval: frequency of health probes.
+ * - failoverThreshold: the number of consecutive failed probes required
before switching away from
+ * the current cluster.
+ * - switchBackThreshold: the number of consecutive successful probes to
the primary required before
+ * switching back from a secondary while that secondary remains
available. If the active secondary
+ * becomes unavailable and the primary is available, the implementation
may switch back to the
+ * primary immediately, regardless of this threshold.
+ */
+ class Builder {
+ public:
+ Builder(ServiceInfo primary, std::vector<ServiceInfo> secondary)
+ : config_(std::move(primary), std::move(secondary)) {}
+
+ // Set how frequently probes run against the active cluster(s).
Default: 5 seconds.
+ Builder& withCheckInterval(std::chrono::milliseconds interval) {
+ config_.checkInterval = interval;
+ return *this;
+ }
+
+ // Set the number of consecutive failed probes required before
attempting failover. Default: 1.
+ Builder& withFailoverThreshold(uint32_t threshold) {
+ config_.failoverThreshold = threshold;
+ return *this;
+ }
+
+ // Set the number of consecutive successful primary probes required
before switching back from a
+ // healthy secondary. If the active secondary becomes unavailable and
the primary is available,
+ // the implementation may switch back immediately regardless of this
threshold. Default: 1.
+ Builder& withSwitchBackThreshold(uint32_t threshold) {
+ config_.switchBackThreshold = threshold;
+ return *this;
+ }
+
+ AutoClusterFailover build() { return
AutoClusterFailover(std::move(config_)); }
+
+ private:
+ Config config_;
+ };
+
+ explicit AutoClusterFailover(Config&& config);
+
+ ~AutoClusterFailover() final;
+
+ ServiceInfo initialServiceInfo() final;
+
+ void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate)
final;
+
+ private:
+ std::shared_ptr<AutoClusterFailoverImpl> impl_;
+};
+
+} // namespace pulsar
+
+#endif
diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc
new file mode 100644
index 0000000..4fdfc1e
--- /dev/null
+++ b/lib/AutoClusterFailover.cc
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/AutoClusterFailover.h>
+
+#include <chrono>
+#include <future>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "AsioTimer.h"
+#include "LogUtils.h"
+#include "ServiceURI.h"
+#include "Url.h"
+
+#ifdef USE_ASIO
+#include <asio/connect.hpp>
+#include <asio/executor_work_guard.hpp>
+#include <asio/io_context.hpp>
+#include <asio/ip/tcp.hpp>
+#include <asio/post.hpp>
+#include <asio/steady_timer.hpp>
+#else
+#include <boost/asio/connect.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/post.hpp>
+#include <boost/asio/steady_timer.hpp>
+#endif
+
+#include "AsioDefines.h"
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+class AutoClusterFailoverImpl : public
std::enable_shared_from_this<AutoClusterFailoverImpl> {
+ public:
+ AutoClusterFailoverImpl(AutoClusterFailover::Config&& config)
+ : config_(std::move(config)), currentServiceInfo_(&config_.primary) {}
+
+ ~AutoClusterFailoverImpl() {
+ using namespace std::chrono_literals;
+ if (!thread_.joinable()) {
+ return;
+ }
+
+ cancelTimer(*timer_);
+ workGuard_.reset();
+ ioContext_.stop();
+
+ if (future_.wait_for(3s) != std::future_status::ready) {
+ LOG_WARN("AutoClusterFailoverImpl is not stopped within 3 seconds,
waiting for it to finish");
+ }
+ thread_.join();
+ }
+
+ auto primary() const noexcept { return config_.primary; }
+
+ void initialize(std::function<void(ServiceInfo)>&& onServiceInfoUpdate) {
+ onServiceInfoUpdate_ = std::move(onServiceInfoUpdate);
+ workGuard_.emplace(ASIO::make_work_guard(ioContext_));
+ timer_.emplace(ioContext_);
+
+ auto weakSelf = weak_from_this();
+ ASIO::post(ioContext_, [weakSelf] {
+ if (auto self = weakSelf.lock()) {
+ self->scheduleFailoverCheck();
+ }
+ });
+
+ // Capturing `this` is safe because the thread will be joined in the
destructor
+ std::promise<void> promise;
+ future_ = promise.get_future();
+ thread_ = std::thread([this, promise{std::move(promise)}]() mutable {
+ ioContext_.run();
+ promise.set_value();
+ });
+ }
+
+ private:
+ static constexpr std::chrono::milliseconds probeTimeout_{30000};
+ using CompletionCallback = std::function<void()>;
+ using ProbeCallback = std::function<void(bool)>;
+
+ struct ProbeContext {
+ ASIO::ip::tcp::resolver resolver;
+ ASIO::ip::tcp::socket socket;
+ ASIO::steady_timer timer;
+ ProbeCallback callback;
+ bool done{false};
+ std::string hostUrl;
+
+ ProbeContext(ASIO::io_context& ioContext, std::string hostUrl,
ProbeCallback callback)
+ : resolver(ioContext),
+ socket(ioContext),
+ timer(ioContext),
+ callback(std::move(callback)),
+ hostUrl(std::move(hostUrl)) {}
+ };
+
+ AutoClusterFailover::Config config_;
+ const ServiceInfo* currentServiceInfo_;
+ uint32_t consecutiveFailureCount_{0};
+ uint32_t consecutivePrimaryRecoveryCount_{0};
+
+ std::thread thread_;
+ std::future<void> future_;
+
+ ASIO::io_context ioContext_;
+ std::function<void(ServiceInfo)> onServiceInfoUpdate_;
+
+ std::optional<ASIO::executor_work_guard<ASIO::io_context::executor_type>>
workGuard_;
+ std::optional<ASIO::steady_timer> timer_;
+
+ bool isUsingPrimary() const noexcept { return currentServiceInfo_ ==
&config_.primary; }
+
+ const ServiceInfo& current() const noexcept { return *currentServiceInfo_;
}
+
+ void scheduleFailoverCheck() {
+ timer_->expires_after(config_.checkInterval);
+ auto weakSelf = weak_from_this();
+ timer_->async_wait([weakSelf](ASIO_ERROR error) {
+ if (error) {
+ return;
+ }
+ if (auto self = weakSelf.lock()) {
+ self->executeFailoverCheck();
+ }
+ });
+ }
+
+ void executeFailoverCheck() {
+ auto done = [weakSelf = weak_from_this()] {
+ if (auto self = weakSelf.lock()) {
+ self->scheduleFailoverCheck();
+ }
+ };
+
+ if (isUsingPrimary()) {
+ checkAndFailoverToSecondaryAsync(std::move(done));
+ } else {
+ checkSecondaryAndPrimaryAsync(std::move(done));
+ }
+ }
+
+ static void completeProbe(const std::shared_ptr<ProbeContext>& context,
bool success,
+ const ASIO_ERROR& error = ASIO_SUCCESS) {
+ if (context->done) {
+ return;
+ }
+
+ context->done = true;
+ ASIO_ERROR ignored;
+ context->resolver.cancel();
+ context->socket.close(ignored);
+ context->timer.cancel(ignored);
+
+ context->callback(success);
+ }
+
+ void probeHostAsync(const std::string& hostUrl, ProbeCallback callback) {
+ Url parsedUrl;
+ if (!Url::parse(hostUrl, parsedUrl)) {
+ LOG_WARN("Failed to parse service URL for probing: " << hostUrl);
+ callback(false);
+ return;
+ }
+
+ auto context = std::make_shared<ProbeContext>(ioContext_, hostUrl,
std::move(callback));
+ context->timer.expires_after(probeTimeout_);
+ context->timer.async_wait([context](const ASIO_ERROR& error) {
+ if (!error) {
+ completeProbe(context, false, ASIO::error::timed_out);
+ }
+ });
+
+ context->resolver.async_resolve(
+ parsedUrl.host(), std::to_string(parsedUrl.port()),
+ [context](const ASIO_ERROR& error, const
ASIO::ip::tcp::resolver::results_type& endpoints) {
+ if (error) {
+ completeProbe(context, false, error);
+ return;
+ }
+
+ ASIO::async_connect(
+ context->socket, endpoints,
+ [context](const ASIO_ERROR& connectError, const
ASIO::ip::tcp::endpoint&) {
+ completeProbe(context, !connectError, connectError);
+ });
+ });
+ }
+
+ void probeHostsAsync(const std::shared_ptr<std::vector<std::string>>&
hosts, size_t index,
+ ProbeCallback callback) {
+ if (index >= hosts->size()) {
+ callback(false);
+ return;
+ }
+
+ auto hostUrl = (*hosts)[index];
+ auto weakSelf = weak_from_this();
+ probeHostAsync(hostUrl,
+ [weakSelf, hosts, index, callback =
std::move(callback)](bool available) mutable {
+ if (available) {
+ callback(true);
+ return;
+ }
+ if (auto self = weakSelf.lock()) {
+ self->probeHostsAsync(hosts, index + 1,
std::move(callback));
+ }
+ });
+ }
+
+ void probeAvailableAsync(const ServiceInfo& serviceInfo, ProbeCallback
callback) {
+ try {
+ ServiceURI serviceUri{serviceInfo.serviceUrl()};
+ auto hosts =
std::make_shared<std::vector<std::string>>(serviceUri.getServiceHosts());
+ if (hosts->empty()) {
+ callback(false);
+ return;
+ }
+ probeHostsAsync(hosts, 0, std::move(callback));
+ } catch (const std::exception& e) {
+ LOG_WARN("Failed to probe service URL " <<
serviceInfo.serviceUrl() << ": " << e.what());
+ callback(false);
+ }
+ }
+
+ void switchTo(const ServiceInfo* serviceInfo) {
+ if (currentServiceInfo_ == serviceInfo) {
+ return;
+ }
+
+ LOG_INFO("Switch service URL from " << current().serviceUrl() << " to
" << serviceInfo->serviceUrl());
+ currentServiceInfo_ = serviceInfo;
+ consecutiveFailureCount_ = 0;
+ consecutivePrimaryRecoveryCount_ = 0;
+ onServiceInfoUpdate_(current());
+ }
+
+ void probeSecondaryFrom(size_t index, const ServiceInfo*
excludedServiceInfo, ProbeCallback callback) {
+ if (index >= config_.secondary.size()) {
+ callback(false);
+ return;
+ }
+
+ if (&config_.secondary[index] == excludedServiceInfo) {
+ probeSecondaryFrom(index + 1, excludedServiceInfo,
std::move(callback));
+ return;
+ }
+
+ auto weakSelf = weak_from_this();
+ probeAvailableAsync(
+ config_.secondary[index],
+ [weakSelf, index, excludedServiceInfo, callback =
std::move(callback)](bool available) mutable {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+
+ LOG_DEBUG("Detected secondary " <<
self->config_.secondary[index].serviceUrl()
+ << " availability: " <<
available);
+ if (available) {
+ self->switchTo(&self->config_.secondary[index]);
+ callback(true);
+ return;
+ }
+
+ self->probeSecondaryFrom(index + 1, excludedServiceInfo,
std::move(callback));
+ });
+ }
+
+ void checkAndFailoverToSecondaryAsync(CompletionCallback done) {
+ auto weakSelf = weak_from_this();
+ probeAvailableAsync(current(), [weakSelf, done = std::move(done)](bool
primaryAvailable) mutable {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+
+ LOG_DEBUG("Detected primary " << self->current().serviceUrl()
+ << " availability: " <<
primaryAvailable);
+ if (primaryAvailable) {
+ self->consecutiveFailureCount_ = 0;
+ done();
+ return;
+ }
+
+ if (++self->consecutiveFailureCount_ <
self->config_.failoverThreshold) {
+ done();
+ return;
+ }
+
+ self->probeSecondaryFrom(0, nullptr, [done =
std::move(done)](bool) mutable { done(); });
+ });
+ }
+
+ void failoverFromUnavailableSecondaryAsync(CompletionCallback done) {
+ auto weakSelf = weak_from_this();
+ probeAvailableAsync(
+ config_.primary, [weakSelf, done = std::move(done)](bool
primaryAvailable) mutable {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+
+ LOG_DEBUG("Detected primary while secondary is unavailable "
+ << self->config_.primary.serviceUrl() << "
availability: " << primaryAvailable);
+ if (primaryAvailable) {
+ self->switchTo(&self->config_.primary);
+ done();
+ return;
+ }
+
+ self->probeSecondaryFrom(
+ 0, self->currentServiceInfo_,
+ [weakSelf, done = std::move(done)](bool
switchedToAnotherSecondary) mutable {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+
+ if (switchedToAnotherSecondary) {
+ done();
+ return;
+ }
+
+ self->checkSwitchBackToPrimaryAsync(std::move(done),
false);
+ });
+ });
+ }
+
+ void checkSwitchBackToPrimaryAsync(CompletionCallback done,
std::optional<bool> primaryAvailableHint) {
+ auto handlePrimaryAvailable = [weakSelf = weak_from_this(),
+ done = std::move(done)](bool
primaryAvailable) mutable {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+
+ if (!primaryAvailable) {
+ self->consecutivePrimaryRecoveryCount_ = 0;
+ done();
+ return;
+ }
+
+ if (++self->consecutivePrimaryRecoveryCount_ >=
self->config_.switchBackThreshold) {
+ self->switchTo(&self->config_.primary);
+ }
+ done();
+ };
+
+ if (primaryAvailableHint.has_value()) {
+ handlePrimaryAvailable(*primaryAvailableHint);
+ return;
+ }
+
+ probeAvailableAsync(config_.primary,
std::move(handlePrimaryAvailable));
+ }
+
+ void checkSecondaryAndPrimaryAsync(CompletionCallback done) {
+ auto weakSelf = weak_from_this();
+ probeAvailableAsync(current(), [weakSelf, done = std::move(done)](bool
secondaryAvailable) mutable {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+
+ LOG_DEBUG("Detected secondary " << self->current().serviceUrl()
+ << " availability: " <<
secondaryAvailable);
+ if (secondaryAvailable) {
+ self->consecutiveFailureCount_ = 0;
+ self->checkSwitchBackToPrimaryAsync(std::move(done),
std::nullopt);
+ return;
+ }
+
+ if (++self->consecutiveFailureCount_ <
self->config_.failoverThreshold) {
+ self->checkSwitchBackToPrimaryAsync(std::move(done),
std::nullopt);
+ return;
+ }
+
+ self->failoverFromUnavailableSecondaryAsync(std::move(done));
+ });
+ }
+};
+
+AutoClusterFailover::AutoClusterFailover(Config&& config)
+ : impl_(std::make_shared<AutoClusterFailoverImpl>(std::move(config))) {}
+
+AutoClusterFailover::~AutoClusterFailover() {}
+
+ServiceInfo AutoClusterFailover::initialServiceInfo() { return
impl_->primary(); }
+
+void AutoClusterFailover::initialize(std::function<void(ServiceInfo)>
onServiceInfoUpdate) {
+ impl_->initialize(std::move(onServiceInfoUpdate));
+}
+
+} // namespace pulsar
diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc
index 82f5f6f..175c531 100644
--- a/tests/ServiceInfoProviderTest.cc
+++ b/tests/ServiceInfoProviderTest.cc
@@ -17,16 +17,20 @@
* under the License.
*/
#include <gtest/gtest.h>
+#include <pulsar/AutoClusterFailover.h>
#include <pulsar/Client.h>
#include <atomic>
+#include <chrono>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
+#include <vector>
#include "PulsarFriend.h"
#include "WaitUtils.h"
+#include "lib/AsioDefines.h"
#include "lib/LogUtils.h"
DECLARE_LOG_OBJECT()
@@ -34,6 +38,113 @@ DECLARE_LOG_OBJECT()
using namespace pulsar;
using namespace std::chrono_literals;
+namespace {
+
+class ProbeTcpServer {
+ public:
+ ProbeTcpServer() { start(); }
+
+ ~ProbeTcpServer() { stop(); }
+
+ void start() {
+ if (running_) {
+ return;
+ }
+
+ auto ioContext = std::unique_ptr<ASIO::io_context>(new
ASIO::io_context);
+ auto acceptor = std::unique_ptr<ASIO::ip::tcp::acceptor>(new
ASIO::ip::tcp::acceptor(*ioContext));
+ ASIO::ip::tcp::endpoint endpoint{ASIO::ip::tcp::v4(),
static_cast<unsigned short>(port_)};
+ acceptor->open(endpoint.protocol());
+ acceptor->set_option(ASIO::ip::tcp::acceptor::reuse_address(true));
+ acceptor->bind(endpoint);
+ acceptor->listen();
+
+ port_ = acceptor->local_endpoint().port();
+ ioContext_ = std::move(ioContext);
+ acceptor_ = std::move(acceptor);
+ running_ = true;
+
+ scheduleAccept();
+ serverThread_ = std::thread([this] { ioContext_->run(); });
+ }
+
+ void stop() {
+ if (!running_.exchange(false)) {
+ return;
+ }
+
+ ASIO::post(*ioContext_, [this] {
+ ASIO_ERROR ignored;
+ if (acceptor_ && acceptor_->is_open()) {
+ acceptor_->close(ignored);
+ }
+ });
+
+ if (serverThread_.joinable()) {
+ serverThread_.join();
+ }
+
+ acceptor_.reset();
+ ioContext_.reset();
+ }
+
+ std::string getServiceUrl() const { return "pulsar://127.0.0.1:" +
std::to_string(port_); }
+
+ private:
+ void scheduleAccept() {
+ if (!running_ || !acceptor_ || !acceptor_->is_open()) {
+ return;
+ }
+
+ auto socket = std::make_shared<ASIO::ip::tcp::socket>(*ioContext_);
+ acceptor_->async_accept(*socket, [this, socket](const ASIO_ERROR
&error) {
+ if (!error) {
+ ASIO_ERROR ignored;
+ socket->close(ignored);
+ }
+
+ if (running_ && acceptor_ && acceptor_->is_open()) {
+ scheduleAccept();
+ }
+ });
+ }
+
+ int port_{0};
+ std::atomic_bool running_{false};
+ std::unique_ptr<ASIO::io_context> ioContext_;
+ std::unique_ptr<ASIO::ip::tcp::acceptor> acceptor_;
+ std::thread serverThread_;
+};
+
+class ServiceUrlObserver {
+ public:
+ void onUpdate(const ServiceInfo &serviceInfo) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ serviceUrls_.emplace_back(serviceInfo.serviceUrl());
+ }
+
+ size_t size() const {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return serviceUrls_.size();
+ }
+
+ std::string last() const {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return serviceUrls_.empty() ? std::string() : serviceUrls_.back();
+ }
+
+ std::vector<std::string> snapshot() const {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return serviceUrls_;
+ }
+
+ private:
+ mutable std::mutex mutex_;
+ std::vector<std::string> serviceUrls_;
+};
+
+} // namespace
+
class ServiceInfoHolder {
public:
ServiceInfoHolder(ServiceInfo info) : serviceInfo_(std::move(info)) {}
@@ -93,6 +204,113 @@ class TestServiceInfoProvider : public ServiceInfoProvider
{
mutable std::mutex mutex_;
};
+TEST(AutoClusterFailoverTest, testFailoverToFirstAvailableSecondaryAfterDelay)
{
+ ProbeTcpServer availableSecondary;
+ ProbeTcpServer unavailableSecondary;
+ const auto primaryUrl = unavailableSecondary.getServiceUrl();
+ unavailableSecondary.stop();
+
+ ProbeTcpServer skippedSecondary;
+ const auto skippedSecondaryUrl = skippedSecondary.getServiceUrl();
+ skippedSecondary.stop();
+
+ const auto availableSecondaryUrl = availableSecondary.getServiceUrl();
+ ServiceUrlObserver observer;
+ AutoClusterFailover provider =
+ AutoClusterFailover::Builder(ServiceInfo(primaryUrl),
+ {ServiceInfo(skippedSecondaryUrl),
ServiceInfo(availableSecondaryUrl)})
+ .withCheckInterval(20ms)
+ .withFailoverThreshold(6)
+ .withSwitchBackThreshold(6)
+ .build();
+
+ ASSERT_EQ(provider.initialServiceInfo().serviceUrl(), primaryUrl);
+
+ observer.onUpdate(provider.initialServiceInfo());
+ provider.initialize([&observer](const ServiceInfo &serviceInfo) {
observer.onUpdate(serviceInfo); });
+
+ ASSERT_FALSE(waitUntil(
+ 80ms, [&observer, &availableSecondaryUrl] { return observer.last() ==
availableSecondaryUrl; }));
+ ASSERT_TRUE(waitUntil(
+ 2s, [&observer, &availableSecondaryUrl] { return observer.last() ==
availableSecondaryUrl; }));
+
+ const auto updates = observer.snapshot();
+ ASSERT_EQ(updates.size(), 2u);
+ ASSERT_EQ(updates[0], primaryUrl);
+ ASSERT_EQ(updates[1], availableSecondaryUrl);
+}
+
+TEST(AutoClusterFailoverTest, testSwitchBackToPrimaryAfterRecoveryDelay) {
+ ProbeTcpServer primary;
+ const auto primaryUrl = primary.getServiceUrl();
+ primary.stop();
+
+ ProbeTcpServer secondary;
+ const auto secondaryUrl = secondary.getServiceUrl();
+
+ ServiceUrlObserver observer;
+ AutoClusterFailover provider =
+ AutoClusterFailover::Builder(ServiceInfo(primaryUrl),
{ServiceInfo(secondaryUrl)})
+ .withCheckInterval(20ms)
+ .withFailoverThreshold(4)
+ .withSwitchBackThreshold(6)
+ .build();
+
+ observer.onUpdate(provider.initialServiceInfo());
+ provider.initialize([&observer](const ServiceInfo &serviceInfo) {
observer.onUpdate(serviceInfo); });
+
+ ASSERT_TRUE(waitUntil(2s, [&observer, &secondaryUrl] { return
observer.last() == secondaryUrl; }));
+
+ primary.start();
+
+ ASSERT_FALSE(waitUntil(80ms, [&observer, &primaryUrl] { return
observer.last() == primaryUrl; }));
+ ASSERT_TRUE(waitUntil(2s, [&observer, &primaryUrl] { return
observer.last() == primaryUrl; }));
+
+ const auto updates = observer.snapshot();
+ ASSERT_EQ(updates.size(), 3u);
+ ASSERT_EQ(updates[0], primaryUrl);
+ ASSERT_EQ(updates[1], secondaryUrl);
+ ASSERT_EQ(updates[2], primaryUrl);
+}
+
+TEST(AutoClusterFailoverTest,
testFailoverToAnotherSecondaryWhenCurrentSecondaryIsUnavailable) {
+ ProbeTcpServer primary;
+ const auto primaryUrl = primary.getServiceUrl();
+ primary.stop();
+
+ ProbeTcpServer firstSecondary;
+ const auto firstSecondaryUrl = firstSecondary.getServiceUrl();
+
+ ProbeTcpServer secondSecondary;
+ const auto secondSecondaryUrl = secondSecondary.getServiceUrl();
+
+ ServiceUrlObserver observer;
+ AutoClusterFailover provider =
+ AutoClusterFailover::Builder(ServiceInfo(primaryUrl),
+ {ServiceInfo(firstSecondaryUrl),
ServiceInfo(secondSecondaryUrl)})
+ .withCheckInterval(20ms)
+ .withFailoverThreshold(4)
+ .withSwitchBackThreshold(6)
+ .build();
+
+ observer.onUpdate(provider.initialServiceInfo());
+ provider.initialize([&observer](const ServiceInfo &serviceInfo) {
observer.onUpdate(serviceInfo); });
+
+ ASSERT_TRUE(
+ waitUntil(2s, [&observer, &firstSecondaryUrl] { return observer.last()
== firstSecondaryUrl; }));
+
+ firstSecondary.stop();
+
+ ASSERT_TRUE(
+ waitUntil(2s, [&observer, &secondSecondaryUrl] { return
observer.last() == secondSecondaryUrl; }));
+
+ const auto updates = observer.snapshot();
+ ASSERT_EQ(updates.size(), 3u);
+ ASSERT_EQ(updates[0], primaryUrl);
+ ASSERT_EQ(updates[1], firstSecondaryUrl);
+ ASSERT_EQ(updates[2], secondSecondaryUrl);
+}
+
TEST(ServiceInfoProviderTest, testSwitchCluster) {
extern std::string getToken(); // from tests/AuthTokenTest.cc
// Access "private/auth" namespace in cluster 1