This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new d1dd08b [feat] PIP-307 added assigned broker urls for CloseProudcer
and CloseConsumer commands and handler logic (#389)
d1dd08b is described below
commit d1dd08ba3d9c964506ead1a9e3cde0cccad4621b
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Jan 24 06:39:17 2024 -0800
[feat] PIP-307 added assigned broker urls for CloseProudcer and
CloseConsumer commands and handler logic (#389)
Master Issue:
#[pip-307](https://github.com/apache/pulsar/blob/master/pip/pip-307.md)
### Motivation
As part of PIP-307, I am adding the client logic to this c++ client.
### Modifications
- Added assigned broker urls to CloseProudcer and CloseConsumer commands.
- Updated the client reconnection logic to directly connect to the assigned
broker urls
### Verifying this change
- Added ExtensibleLoadManagerTest to cover this logic
---
lib/BinaryProtoLookupService.cc | 4 +-
lib/ClientConnection.cc | 30 +++-
lib/ClientConnection.h | 4 +
lib/ClientImpl.cc | 35 ++++-
lib/ClientImpl.h | 7 +
lib/ConsumerImpl.cc | 9 +-
lib/ConsumerImpl.h | 1 +
lib/HandlerBase.cc | 29 ++--
lib/HandlerBase.h | 17 +-
lib/LookupService.h | 1 +
lib/ProducerImpl.cc | 9 +-
lib/ProducerImpl.h | 1 +
proto/PulsarApi.proto | 4 +
run-unit-tests.sh | 8 +
tests/BuildTests.cmake | 3 +
tests/LegacyBuildTests.cmake | 3 +
tests/extensibleLM/ExtensibleLoadManagerTest.cc | 198 ++++++++++++++++++++++++
tests/extensibleLM/docker-compose.yml | 150 ++++++++++++++++++
18 files changed, 492 insertions(+), 21 deletions(-)
diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc
index 87f02ea..2d9ffc4 100644
--- a/lib/BinaryProtoLookupService.cc
+++ b/lib/BinaryProtoLookupService.cc
@@ -87,9 +87,9 @@ auto BinaryProtoLookupService::findBroker(const std::string&
address, bool autho
<< ", from " <<
cnx->cnxString());
if (data->shouldProxyThroughServiceUrl()) {
// logicalAddress is the proxy's address, we should still
connect through proxy
- promise->setValue({responseBrokerAddress, address});
+ promise->setValue({responseBrokerAddress, address, true});
} else {
- promise->setValue({responseBrokerAddress,
responseBrokerAddress});
+ promise->setValue({responseBrokerAddress,
responseBrokerAddress, false});
}
}
});
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index b2916bd..0641809 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -1762,6 +1762,30 @@ void ClientConnection::handleError(const
proto::CommandError& error) {
}
}
+boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
+ const proto::CommandCloseProducer& closeProducer) {
+ if (tlsSocket_) {
+ if (closeProducer.has_assignedbrokerserviceurltls()) {
+ return closeProducer.assignedbrokerserviceurltls();
+ }
+ } else if (closeProducer.has_assignedbrokerserviceurl()) {
+ return closeProducer.assignedbrokerserviceurl();
+ }
+ return boost::none;
+}
+
+boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
+ const proto::CommandCloseConsumer& closeConsumer) {
+ if (tlsSocket_) {
+ if (closeConsumer.has_assignedbrokerserviceurltls()) {
+ return closeConsumer.assignedbrokerserviceurltls();
+ }
+ } else if (closeConsumer.has_assignedbrokerserviceurl()) {
+ return closeConsumer.assignedbrokerserviceurl();
+ }
+ return boost::none;
+}
+
void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer&
closeProducer) {
int producerId = closeProducer.producer_id();
@@ -1775,7 +1799,8 @@ void ClientConnection::handleCloseProducer(const
proto::CommandCloseProducer& cl
lock.unlock();
if (producer) {
- producer->disconnectProducer();
+ auto assignedBrokerServiceUrl =
getAssignedBrokerServiceUrl(closeProducer);
+ producer->disconnectProducer(assignedBrokerServiceUrl);
}
} else {
LOG_ERROR(cnxString_ << "Got invalid producer Id in closeProducer
command: " << producerId);
@@ -1795,7 +1820,8 @@ void ClientConnection::handleCloseConsumer(const
proto::CommandCloseConsumer& cl
lock.unlock();
if (consumer) {
- consumer->disconnectConsumer();
+ auto assignedBrokerServiceUrl =
getAssignedBrokerServiceUrl(closeconsumer);
+ consumer->disconnectConsumer(assignedBrokerServiceUrl);
}
} else {
LOG_ERROR(cnxString_ << "Got invalid consumer Id in closeConsumer
command: " << consumerId);
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 1d44f05..851ec0c 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -421,6 +421,10 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
void handleGetTopicOfNamespaceResponse(const
proto::CommandGetTopicsOfNamespaceResponse&);
void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
void handleAckResponse(const proto::CommandAckResponse&);
+ boost::optional<std::string> getAssignedBrokerServiceUrl(
+ const proto::CommandCloseProducer& closeProducer);
+ boost::optional<std::string> getAssignedBrokerServiceUrl(
+ const proto::CommandCloseConsumer& closeConsumer);
};
} // namespace pulsar
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 76d4389..63c85b0 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -91,7 +91,9 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const
ClientConfiguration&
ClientImpl::getClientVersion(clientConfiguration)),
producerIdGenerator_(0),
consumerIdGenerator_(0),
- closingError(ResultOk) {
+ closingError(ResultOk),
+ useProxy_(false),
+ lookupCount_(0L) {
std::unique_ptr<LoggerFactory> loggerFactory =
clientConfiguration_.impl_->takeLogger();
if (loggerFactory) {
LogUtils::setLoggerFactory(std::move(loggerFactory));
@@ -532,6 +534,8 @@ Future<Result, ClientConnectionPtr>
ClientImpl::getConnection(const std::string&
promise.setFailed(result);
return;
}
+ useProxy_ = data.proxyThroughServiceUrl;
+ lookupCount_++;
pool_.getConnectionAsync(data.logicalAddress,
data.physicalAddress, key)
.addListener([promise](Result result, const
ClientConnectionWeakPtr& weakCnx) {
if (result == ResultOk) {
@@ -550,6 +554,33 @@ Future<Result, ClientConnectionPtr>
ClientImpl::getConnection(const std::string&
return promise.getFuture();
}
+const std::string& ClientImpl::getPhysicalAddress(const std::string&
logicalAddress) {
+ if (useProxy_) {
+ return serviceNameResolver_.resolveHost();
+ } else {
+ return logicalAddress;
+ }
+}
+
+Future<Result, ClientConnectionPtr> ClientImpl::connect(const std::string&
logicalAddress, size_t key) {
+ const auto& physicalAddress = getPhysicalAddress(logicalAddress);
+ Promise<Result, ClientConnectionPtr> promise;
+ pool_.getConnectionAsync(logicalAddress, physicalAddress, key)
+ .addListener([promise](Result result, const ClientConnectionWeakPtr&
weakCnx) {
+ if (result == ResultOk) {
+ auto cnx = weakCnx.lock();
+ if (cnx) {
+ promise.setValue(cnx);
+ } else {
+ promise.setFailed(ResultConnectError);
+ }
+ } else {
+ promise.setFailed(result);
+ }
+ });
+ return promise.getFuture();
+}
+
void ClientImpl::handleGetPartitions(const Result result, const
LookupDataResultPtr partitionMetadata,
TopicNamePtr topicName,
GetPartitionsCallback callback) {
if (result != ResultOk) {
@@ -635,6 +666,7 @@ void ClientImpl::closeAsync(CloseCallback callback) {
if (*numberOfOpenHandlers == 0 && callback) {
handleClose(ResultOk, numberOfOpenHandlers, callback);
}
+ lookupCount_ = 0;
}
void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers,
ResultCallback callback) {
@@ -722,6 +754,7 @@ void ClientImpl::shutdown() {
partitionListenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout());
timeoutProcessor.tok();
LOG_DEBUG("partitionListenerExecutorProvider_ is closed");
+ lookupCount_ = 0;
}
uint64_t ClientImpl::newProducerId() {
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 762aa60..a2649a6 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -97,6 +97,8 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
Future<Result, ClientConnectionPtr> getConnection(const std::string&
topic, size_t key);
+ Future<Result, ClientConnectionPtr> connect(const std::string&
logicalAddress, size_t key);
+
void closeAsync(CloseCallback callback);
void shutdown();
@@ -124,6 +126,7 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
std::shared_ptr<std::atomic<uint64_t>> getRequestIdGenerator() const {
return requestIdGenerator_; }
ConnectionPool& getConnectionPool() noexcept { return pool_; }
+ uint64_t getLookupCount() { return lookupCount_; }
static std::chrono::nanoseconds getOperationTimeout(const
ClientConfiguration& clientConfiguration);
@@ -160,6 +163,8 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
const std::string& consumerName,
const ConsumerConfiguration& conf,
SubscribeCallback callback);
+ const std::string& getPhysicalAddress(const std::string& logicalAddress);
+
static std::string getClientVersion(const ClientConfiguration&
clientConfiguration);
enum State
@@ -191,6 +196,8 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
SynchronizedHashMap<ConsumerImplBase*, ConsumerImplBaseWeakPtr> consumers_;
std::atomic<Result> closingError;
+ std::atomic<bool> useProxy_;
+ std::atomic<uint64_t> lookupCount_;
friend class Client;
};
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index dbd3b65..86dddb0 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1234,10 +1234,13 @@ void ConsumerImpl::negativeAcknowledge(const MessageId&
messageId) {
negativeAcksTracker_->add(messageId);
}
-void ConsumerImpl::disconnectConsumer() {
- LOG_INFO("Broker notification of Closed consumer: " << consumerId_);
+void ConsumerImpl::disconnectConsumer() { disconnectConsumer(boost::none); }
+
+void ConsumerImpl::disconnectConsumer(const boost::optional<std::string>&
assignedBrokerUrl) {
+ LOG_INFO("Broker notification of Closed consumer: "
+ << consumerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " +
assignedBrokerUrl.get()) : ""));
resetCnx();
- scheduleReconnection();
+ scheduleReconnection(assignedBrokerUrl);
}
void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 0f801f6..5612091 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -131,6 +131,7 @@ class ConsumerImpl : public ConsumerImplBase {
void hasMessageAvailableAsync(HasMessageAvailableCallback callback)
override;
virtual void disconnectConsumer();
+ virtual void disconnectConsumer(const boost::optional<std::string>&
assignedBrokerUrl);
Result fetchSingleMessageFromBroker(Message& msg);
virtual bool isCumulativeAcknowledgementAllowed(ConsumerType consumerType);
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index fa21a57..c5327fc 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -68,7 +68,18 @@ void HandlerBase::setCnx(const ClientConnectionPtr& cnx) {
connection_ = cnx;
}
-void HandlerBase::grabCnx() {
+void HandlerBase::grabCnx() { grabCnx(boost::none); }
+
+Future<Result, ClientConnectionPtr> HandlerBase::getConnection(
+ const ClientImplPtr& client, const boost::optional<std::string>&
assignedBrokerUrl) {
+ if (assignedBrokerUrl && client->getLookupCount() > 0) {
+ return client->connect(assignedBrokerUrl.get(), connectionKeySuffix_);
+ } else {
+ return client->getConnection(topic(), connectionKeySuffix_);
+ }
+}
+
+void HandlerBase::grabCnx(const boost::optional<std::string>&
assignedBrokerUrl) {
bool expectedState = false;
if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
LOG_INFO(getName() << "Ignoring reconnection attempt since there's
already a pending reconnection");
@@ -90,7 +101,7 @@ void HandlerBase::grabCnx() {
return;
}
auto self = shared_from_this();
- auto cnxFuture = client->getConnection(topic(), connectionKeySuffix_);
+ auto cnxFuture = getConnection(client, assignedBrokerUrl);
cnxFuture.addListener([this, self](Result result, const
ClientConnectionPtr& cnx) {
if (result == ResultOk) {
LOG_DEBUG(getName() << "Connected to broker: " <<
cnx->cnxString());
@@ -141,12 +152,12 @@ void HandlerBase::handleDisconnection(Result result,
const ClientConnectionPtr&
break;
}
}
-
-void HandlerBase::scheduleReconnection() {
+void HandlerBase::scheduleReconnection() { scheduleReconnection(boost::none); }
+void HandlerBase::scheduleReconnection(const boost::optional<std::string>&
assignedBrokerUrl) {
const auto state = state_.load();
if (state == Pending || state == Ready) {
- TimeDuration delay = backoff_.next();
+ TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0)
: backoff_.next();
LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay)
/ 1000.0) << " s");
timer_->expires_from_now(delay);
@@ -154,10 +165,10 @@ void HandlerBase::scheduleReconnection() {
// so we will not run into the case where grabCnx is invoked on out of
scope handler
auto name = getName();
std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
- timer_->async_wait([name, weakSelf](const ASIO_ERROR& ec) {
+ timer_->async_wait([name, weakSelf, assignedBrokerUrl](const
ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (self) {
- self->handleTimeout(ec);
+ self->handleTimeout(ec, assignedBrokerUrl);
} else {
LOG_WARN(name << "Cancel the reconnection since the handler is
destroyed");
}
@@ -165,13 +176,13 @@ void HandlerBase::scheduleReconnection() {
}
}
-void HandlerBase::handleTimeout(const ASIO_ERROR& ec) {
+void HandlerBase::handleTimeout(const ASIO_ERROR& ec, const
boost::optional<std::string>& assignedBrokerUrl) {
if (ec) {
LOG_DEBUG(getName() << "Ignoring timer cancelled event, code[" << ec
<< "]");
return;
} else {
epoch_++;
- grabCnx();
+ grabCnx(assignedBrokerUrl);
}
}
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 68c0b6a..b9d9855 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -20,6 +20,7 @@
#define _PULSAR_HANDLER_BASE_HEADER_
#include <pulsar/Result.h>
+#include <boost/optional.hpp>
#include <memory>
#include <mutex>
#include <string>
@@ -53,11 +54,22 @@ class HandlerBase : public
std::enable_shared_from_this<HandlerBase> {
void resetCnx() { setCnx(nullptr); }
protected:
+ /*
+ * tries reconnection and sets connection_ to valid object
+ * @param assignedBrokerUrl assigned broker url to directly connect to
without lookup
+ */
+ void grabCnx(const boost::optional<std::string>& assignedBrokerUrl);
+
/*
* tries reconnection and sets connection_ to valid object
*/
void grabCnx();
+ /*
+ * Schedule reconnection after backoff time
+ * @param assignedBrokerUrl assigned broker url to directly connect to
without lookup
+ */
+ void scheduleReconnection(const boost::optional<std::string>&
assignedBrokerUrl);
/*
* Schedule reconnection after backoff time
*/
@@ -89,9 +101,12 @@ class HandlerBase : public
std::enable_shared_from_this<HandlerBase> {
private:
const std::shared_ptr<std::string> topic_;
+ Future<Result, ClientConnectionPtr> getConnection(const ClientImplPtr&
client,
+ const
boost::optional<std::string>& assignedBrokerUrl);
+
void handleDisconnection(Result result, const ClientConnectionPtr& cnx);
- void handleTimeout(const ASIO_ERROR& ec);
+ void handleTimeout(const ASIO_ERROR& ec, const
boost::optional<std::string>& assignedBrokerUrl);
protected:
ClientImplWeakPtr client_;
diff --git a/lib/LookupService.h b/lib/LookupService.h
index 35f4730..b50d1f8 100644
--- a/lib/LookupService.h
+++ b/lib/LookupService.h
@@ -42,6 +42,7 @@ class LookupService {
struct LookupResult {
std::string logicalAddress;
std::string physicalAddress;
+ bool proxyThroughServiceUrl;
friend std::ostream& operator<<(std::ostream& os, const LookupResult&
lookupResult) {
return os << "logical address: " << lookupResult.logicalAddress
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index fc39b23..2e5cd44 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -971,12 +971,15 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata&
metadata, SharedBuffer
encryptedPayload);
}
-void ProducerImpl::disconnectProducer() {
- LOG_INFO("Broker notification of Closed producer: " << producerId_);
+void ProducerImpl::disconnectProducer(const boost::optional<std::string>&
assignedBrokerUrl) {
+ LOG_INFO("Broker notification of Closed producer: "
+ << producerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " +
assignedBrokerUrl.get()) : ""));
resetCnx();
- scheduleReconnection();
+ scheduleReconnection(assignedBrokerUrl);
}
+void ProducerImpl::disconnectProducer() { disconnectProducer(boost::none); }
+
void ProducerImpl::start() {
HandlerBase::start();
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 9781605..f650b1f 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -98,6 +98,7 @@ class ProducerImpl : public HandlerBase, public
ProducerImplBase {
bool ackReceived(uint64_t sequenceId, MessageId& messageId);
+ virtual void disconnectProducer(const boost::optional<std::string>&
assignedBrokerUrl);
virtual void disconnectProducer();
uint64_t getProducerId() const;
diff --git a/proto/PulsarApi.proto b/proto/PulsarApi.proto
index 7be6522..a2548f3 100644
--- a/proto/PulsarApi.proto
+++ b/proto/PulsarApi.proto
@@ -623,11 +623,15 @@ message CommandReachedEndOfTopic {
message CommandCloseProducer {
required uint64 producer_id = 1;
required uint64 request_id = 2;
+ optional string assignedBrokerServiceUrl = 3;
+ optional string assignedBrokerServiceUrlTls = 4;
}
message CommandCloseConsumer {
required uint64 consumer_id = 1;
required uint64 request_id = 2;
+ optional string assignedBrokerServiceUrl = 3;
+ optional string assignedBrokerServiceUrlTls = 4;
}
message CommandRedeliverUnacknowledgedMessages {
diff --git a/run-unit-tests.sh b/run-unit-tests.sh
index 898cfed..226789c 100755
--- a/run-unit-tests.sh
+++ b/run-unit-tests.sh
@@ -30,6 +30,14 @@ fi
export http_proxy=
export https_proxy=
+
+# Run ExtensibleLoadManager tests
+docker compose -f tests/extensibleLM/docker-compose.yml up -d
+until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
+sleep 5
+$CMAKE_BUILD_DIRECTORY/tests/ExtensibleLoadManagerTest
+docker compose -f tests/extensibleLM/docker-compose.yml down
+
# Run OAuth2 tests
docker compose -f tests/oauth2/docker-compose.yml up -d
# Wait until the namespace is created, currently there is no good way to check
it
diff --git a/tests/BuildTests.cmake b/tests/BuildTests.cmake
index c468f51..0fe7430 100644
--- a/tests/BuildTests.cmake
+++ b/tests/BuildTests.cmake
@@ -55,3 +55,6 @@ target_link_libraries(Oauth2Test pulsarStatic
${GTEST_TARGETS})
add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc HttpHelper.cc)
target_link_libraries(ChunkDedupTest pulsarStatic ${GTEST_TARGETS})
+
+add_executable(ExtensibleLoadManagerTest
extensibleLM/ExtensibleLoadManagerTest.cc HttpHelper.cc)
+target_link_libraries(ExtensibleLoadManagerTest PRIVATE pulsarStatic
${GTEST_TARGETS})
diff --git a/tests/LegacyBuildTests.cmake b/tests/LegacyBuildTests.cmake
index 1e5335f..c6f4e96 100644
--- a/tests/LegacyBuildTests.cmake
+++ b/tests/LegacyBuildTests.cmake
@@ -75,3 +75,6 @@ target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic
${GTEST_LIBRARY_PAT
add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc HttpHelper.cc)
target_link_libraries(ChunkDedupTest ${CLIENT_LIBS} pulsarStatic
${GTEST_LIBRARY_PATH})
+add_executable(ExtensibleLoadManagerTest
extensibleLM/ExtensibleLoadManagerTest.cc HttpHelper.cc)
+target_include_directories(ExtensibleLoadManagerTest PRIVATE
${AUTOGEN_DIR}/lib)
+target_link_libraries(ExtensibleLoadManagerTest PRIVATE pulsarStatic
${GTEST_LIBRARY_PATH})
diff --git a/tests/extensibleLM/ExtensibleLoadManagerTest.cc
b/tests/extensibleLM/ExtensibleLoadManagerTest.cc
new file mode 100644
index 0000000..4472d9d
--- /dev/null
+++ b/tests/extensibleLM/ExtensibleLoadManagerTest.cc
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// Run `docker-compose up -d` to set up the test environment for this test.
+#include <gtest/gtest.h>
+
+#include <thread>
+
+#include "include/pulsar/Client.h"
+#include "lib/LogUtils.h"
+#include "lib/Semaphore.h"
+#include "tests/HttpHelper.h"
+#include "tests/PulsarFriend.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+bool checkTime() {
+ const static auto start = std::chrono::high_resolution_clock::now();
+ auto end = std::chrono::high_resolution_clock::now();
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end
- start).count();
+ return duration < 180 * 1000;
+}
+
+TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
+ const static std::string adminUrl = "http://localhost:8080/";
+ const static std::string topicName =
+ "persistent://public/unload-test/topic-1" + std::to_string(time(NULL));
+
+ ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
+ std::string url = adminUrl +
"admin/v2/namespaces/public/unload-test?bundles=1";
+ int res = makePutRequest(url, "");
+ return res == 204 || res == 409;
+ }));
+
+ Client client{"pulsar://localhost:6650"};
+ Producer producer;
+ ProducerConfiguration producerConfiguration;
+ Result producerResult = client.createProducer(topicName,
producerConfiguration, producer);
+ ASSERT_EQ(producerResult, ResultOk);
+ Consumer consumer;
+ Result consumerResult = client.subscribe(topicName, "sub", consumer);
+ ASSERT_EQ(consumerResult, ResultOk);
+
+ Semaphore firstUnloadSemaphore(0);
+ Semaphore secondUnloadSemaphore(0);
+ Semaphore halfPubWaitSemaphore(0);
+ const int msgCount = 10;
+ int produced = 0;
+ auto produce = [&]() {
+ int i = 0;
+ while (i < msgCount && checkTime()) {
+ if (i == 3) {
+ firstUnloadSemaphore.acquire();
+ }
+
+ if (i == 5) {
+ halfPubWaitSemaphore.release();
+ }
+
+ if (i == 8) {
+ secondUnloadSemaphore.acquire();
+ }
+
+ std::string content = std::to_string(i);
+ const auto msg = MessageBuilder().setContent(content).build();
+
+ ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
+ Result sendResult = producer.send(msg);
+ return sendResult == ResultOk;
+ }));
+
+ LOG_INFO("produced index:" << i);
+ produced++;
+ i++;
+ }
+ LOG_INFO("producer finished");
+ };
+
+ int consumed = 0;
+ auto consume = [&]() {
+ Message receivedMsg;
+ int i = 0;
+ while (i < msgCount && checkTime()) {
+ ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
+ Result receiveResult =
+ consumer.receive(receivedMsg, 1000); // Assumed that we
wait 1000 ms for each message
+ return receiveResult == ResultOk;
+ }));
+ LOG_INFO("received index:" << i);
+
+ int id = std::stoi(receivedMsg.getDataAsString());
+ if (id < i) {
+ continue;
+ }
+ ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
+ Result ackResult = consumer.acknowledge(receivedMsg);
+ return ackResult == ResultOk;
+ }));
+ LOG_INFO("acked index:" << i);
+
+ consumed++;
+ i++;
+ }
+ LOG_INFO("consumer finished");
+ };
+
+ std::thread produceThread(produce);
+ std::thread consumeThread(consume);
+
+ auto unload = [&] {
+ auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+ auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+ auto &producerImpl = PulsarFriend::getProducerImpl(producer);
+ uint64_t lookupCountBeforeUnload;
+ std::string destinationBroker;
+ while (checkTime()) {
+ // make sure producers and consumers are ready
+ ASSERT_TRUE(waitUntil(std::chrono::seconds(30),
+ [&] { return consumerImpl.isConnected() &&
producerImpl.isConnected(); }));
+
+ std::string url = adminUrl +
"lookup/v2/topic/persistent/public/unload-test/topic-1";
+ std::string responseDataBeforeUnload;
+ int res = makeGetRequest(url, responseDataBeforeUnload);
+ if (res != 200) {
+ continue;
+ }
+ destinationBroker = responseDataBeforeUnload.find("broker-2") ==
std::string::npos
+ ? "broker-2:8080"
+ : "broker-1:8080";
+ lookupCountBeforeUnload = clientImplPtr->getLookupCount();
+ ASSERT_TRUE(lookupCountBeforeUnload > 0);
+
+ url = adminUrl +
+
"admin/v2/namespaces/public/unload-test/0x00000000_0xffffffff/unload?destinationBroker="
+
+ destinationBroker;
+ LOG_INFO("before lookup responseData:" << responseDataBeforeUnload
<< ",unload url:" << url
+ <<
",lookupCountBeforeUnload:" << lookupCountBeforeUnload);
+ res = makePutRequest(url, "");
+ LOG_INFO("unload res:" << res);
+ if (res != 204) {
+ continue;
+ }
+
+ // make sure producers and consumers are ready
+ ASSERT_TRUE(waitUntil(std::chrono::seconds(30),
+ [&] { return consumerImpl.isConnected() &&
producerImpl.isConnected(); }));
+ std::string responseDataAfterUnload;
+ ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
+ url = adminUrl +
"lookup/v2/topic/persistent/public/unload-test/topic-1";
+ res = makeGetRequest(url, responseDataAfterUnload);
+ return res == 200 &&
responseDataAfterUnload.find(destinationBroker) != std::string::npos;
+ }));
+ LOG_INFO("after lookup responseData:" << responseDataAfterUnload
<< ",res:" << res);
+
+ // TODO: check lookup counter after pip-307 is released
+ auto lookupCountAfterUnload = clientImplPtr->getLookupCount();
+ ASSERT_TRUE(lookupCountBeforeUnload < lookupCountAfterUnload);
+ break;
+ }
+ };
+ LOG_INFO("starting first unload");
+ unload();
+ firstUnloadSemaphore.release();
+ halfPubWaitSemaphore.acquire();
+ LOG_INFO("starting second unload");
+ unload();
+ secondUnloadSemaphore.release();
+
+ produceThread.join();
+ consumeThread.join();
+ ASSERT_EQ(consumed, msgCount);
+ ASSERT_EQ(produced, msgCount);
+ ASSERT_TRUE(checkTime()) << "timed out";
+ client.close();
+}
+
+int main(int argc, char *argv[]) {
+ ::testing::InitGoogleTest(&argc, argv);
+
+ return RUN_ALL_TESTS();
+}
diff --git a/tests/extensibleLM/docker-compose.yml
b/tests/extensibleLM/docker-compose.yml
new file mode 100644
index 0000000..8d3c33a
--- /dev/null
+++ b/tests/extensibleLM/docker-compose.yml
@@ -0,0 +1,150 @@
+version: '3'
+networks:
+ pulsar:
+ driver: bridge
+services:
+ # Start ZooKeeper
+ zookeeper:
+ image: apachepulsar/pulsar:latest
+ container_name: zookeeper
+ restart: on-failure
+ networks:
+ - pulsar
+ environment:
+ - metadataStoreUrl=zk:zookeeper:2181
+ - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+ command: >
+ bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \
+ bin/generate-zookeeper-config.sh conf/zookeeper.conf && \
+ exec bin/pulsar zookeeper"
+ healthcheck:
+ test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"]
+ interval: 10s
+ timeout: 5s
+ retries: 30
+
+ # Initialize cluster metadata
+ pulsar-init:
+ container_name: pulsar-init
+ hostname: pulsar-init
+ image: apachepulsar/pulsar:latest
+ networks:
+ - pulsar
+ environment:
+ - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+ command: >
+ bin/pulsar initialize-cluster-metadata \
+ --cluster cluster-a \
+ --zookeeper zookeeper:2181 \
+ --configuration-store zookeeper:2181 \
+ --web-service-url http://broker-1:8080 \
+ --broker-service-url pulsar://broker-1:6650
+ depends_on:
+ zookeeper:
+ condition: service_healthy
+
+ # Start bookie
+ bookie:
+ image: apachepulsar/pulsar:latest
+ container_name: bookie
+ restart: on-failure
+ networks:
+ - pulsar
+ environment:
+ - clusterName=cluster-a
+ - zkServers=zookeeper:2181
+ - metadataServiceUri=metadata-store:zk:zookeeper:2181
+ - advertisedAddress=bookie
+ - BOOKIE_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+ depends_on:
+ zookeeper:
+ condition: service_healthy
+ pulsar-init:
+ condition: service_completed_successfully
+ command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf &&
exec bin/pulsar bookie"
+
+ proxy:
+ image: apachepulsar/pulsar:latest
+ container_name: proxy
+ hostname: proxy
+ restart: on-failure
+ networks:
+ - pulsar
+ environment:
+ - metadataStoreUrl=zk:zookeeper:2181
+ - zookeeperServers=zookeeper:2181
+ - clusterName=cluster-a
+ - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+ ports:
+ - "8080:8080"
+ - "6650:6650"
+ depends_on:
+ broker-1:
+ condition: service_started
+ broker-2:
+ condition: service_started
+ command: bash -c "bin/apply-config-from-env.py conf/proxy.conf && exec
bin/pulsar proxy"
+
+ # Start broker 1
+ broker-1:
+ image: apachepulsar/pulsar:latest
+ container_name: broker-1
+ hostname: broker-1
+ restart: on-failure
+ networks:
+ - pulsar
+ environment:
+ - metadataStoreUrl=zk:zookeeper:2181
+ - zookeeperServers=zookeeper:2181
+ - clusterName=cluster-a
+ - managedLedgerDefaultEnsembleSize=1
+ - managedLedgerDefaultWriteQuorum=1
+ - managedLedgerDefaultAckQuorum=1
+ - advertisedAddress=broker-1
+ - internalListenerName=internal
+ - advertisedListeners=internal:pulsar://broker-1:6650
+ - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
+ # Load Manager. Here uses the extensible load balancer, sets the
unloading strategy to TransferShedder, and enables debug mode.
+ -
loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
+ -
loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
+ - loadBalancerSheddingEnabled=false
+ - loadBalancerDebugModeEnabled=true
+ - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+ depends_on:
+ zookeeper:
+ condition: service_healthy
+ bookie:
+ condition: service_started
+ command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec
bin/pulsar broker"
+
+ # Start broker 2
+ broker-2:
+ image: apachepulsar/pulsar:latest
+ container_name: broker-2
+ hostname: broker-2
+ restart: on-failure
+ networks:
+ - pulsar
+ environment:
+ - metadataStoreUrl=zk:zookeeper:2181
+ - zookeeperServers=zookeeper:2181
+ - clusterName=cluster-a
+ - managedLedgerDefaultEnsembleSize=1
+ - managedLedgerDefaultWriteQuorum=1
+ - managedLedgerDefaultAckQuorum=1
+ - advertisedAddress=broker-2
+ - internalListenerName=internal
+ - advertisedListeners=internal:pulsar://broker-2:6650
+ - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
+ # Load Manager. Here uses the extensible load balancer, sets the
unloading strategy to TransferShedder, and enables debug mode.
+ -
loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
+ -
loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
+ - loadBalancerSheddingEnabled=false
+ - loadBalancerDebugModeEnabled=true
+ - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+ depends_on:
+ zookeeper:
+ condition: service_healthy
+ bookie:
+ condition: service_started
+ command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec
bin/pulsar broker"
\ No newline at end of file