This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new d1dd08b  [feat] PIP-307 added assigned broker urls for CloseProudcer 
and CloseConsumer commands and handler logic (#389)
d1dd08b is described below

commit d1dd08ba3d9c964506ead1a9e3cde0cccad4621b
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Jan 24 06:39:17 2024 -0800

    [feat] PIP-307 added assigned broker urls for CloseProudcer and 
CloseConsumer commands and handler logic (#389)
    
    Master Issue: 
#[pip-307](https://github.com/apache/pulsar/blob/master/pip/pip-307.md)
    
    ### Motivation
    
    As part of PIP-307, I am adding the client logic to this c++ client.
    
    ### Modifications
    
    - Added assigned broker urls to CloseProudcer and CloseConsumer commands.
    - Updated the client reconnection logic to directly connect to the assigned 
broker urls
    
    ### Verifying this change
    
    - Added ExtensibleLoadManagerTest to cover this logic
---
 lib/BinaryProtoLookupService.cc                 |   4 +-
 lib/ClientConnection.cc                         |  30 +++-
 lib/ClientConnection.h                          |   4 +
 lib/ClientImpl.cc                               |  35 ++++-
 lib/ClientImpl.h                                |   7 +
 lib/ConsumerImpl.cc                             |   9 +-
 lib/ConsumerImpl.h                              |   1 +
 lib/HandlerBase.cc                              |  29 ++--
 lib/HandlerBase.h                               |  17 +-
 lib/LookupService.h                             |   1 +
 lib/ProducerImpl.cc                             |   9 +-
 lib/ProducerImpl.h                              |   1 +
 proto/PulsarApi.proto                           |   4 +
 run-unit-tests.sh                               |   8 +
 tests/BuildTests.cmake                          |   3 +
 tests/LegacyBuildTests.cmake                    |   3 +
 tests/extensibleLM/ExtensibleLoadManagerTest.cc | 198 ++++++++++++++++++++++++
 tests/extensibleLM/docker-compose.yml           | 150 ++++++++++++++++++
 18 files changed, 492 insertions(+), 21 deletions(-)

diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc
index 87f02ea..2d9ffc4 100644
--- a/lib/BinaryProtoLookupService.cc
+++ b/lib/BinaryProtoLookupService.cc
@@ -87,9 +87,9 @@ auto BinaryProtoLookupService::findBroker(const std::string& 
address, bool autho
                                                 << ", from " << 
cnx->cnxString());
                 if (data->shouldProxyThroughServiceUrl()) {
                     // logicalAddress is the proxy's address, we should still 
connect through proxy
-                    promise->setValue({responseBrokerAddress, address});
+                    promise->setValue({responseBrokerAddress, address, true});
                 } else {
-                    promise->setValue({responseBrokerAddress, 
responseBrokerAddress});
+                    promise->setValue({responseBrokerAddress, 
responseBrokerAddress, false});
                 }
             }
         });
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index b2916bd..0641809 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -1762,6 +1762,30 @@ void ClientConnection::handleError(const 
proto::CommandError& error) {
     }
 }
 
+boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
+    const proto::CommandCloseProducer& closeProducer) {
+    if (tlsSocket_) {
+        if (closeProducer.has_assignedbrokerserviceurltls()) {
+            return closeProducer.assignedbrokerserviceurltls();
+        }
+    } else if (closeProducer.has_assignedbrokerserviceurl()) {
+        return closeProducer.assignedbrokerserviceurl();
+    }
+    return boost::none;
+}
+
+boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
+    const proto::CommandCloseConsumer& closeConsumer) {
+    if (tlsSocket_) {
+        if (closeConsumer.has_assignedbrokerserviceurltls()) {
+            return closeConsumer.assignedbrokerserviceurltls();
+        }
+    } else if (closeConsumer.has_assignedbrokerserviceurl()) {
+        return closeConsumer.assignedbrokerserviceurl();
+    }
+    return boost::none;
+}
+
 void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer& 
closeProducer) {
     int producerId = closeProducer.producer_id();
 
@@ -1775,7 +1799,8 @@ void ClientConnection::handleCloseProducer(const 
proto::CommandCloseProducer& cl
         lock.unlock();
 
         if (producer) {
-            producer->disconnectProducer();
+            auto assignedBrokerServiceUrl = 
getAssignedBrokerServiceUrl(closeProducer);
+            producer->disconnectProducer(assignedBrokerServiceUrl);
         }
     } else {
         LOG_ERROR(cnxString_ << "Got invalid producer Id in closeProducer 
command: " << producerId);
@@ -1795,7 +1820,8 @@ void ClientConnection::handleCloseConsumer(const 
proto::CommandCloseConsumer& cl
         lock.unlock();
 
         if (consumer) {
-            consumer->disconnectConsumer();
+            auto assignedBrokerServiceUrl = 
getAssignedBrokerServiceUrl(closeconsumer);
+            consumer->disconnectConsumer(assignedBrokerServiceUrl);
         }
     } else {
         LOG_ERROR(cnxString_ << "Got invalid consumer Id in closeConsumer 
command: " << consumerId);
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 1d44f05..851ec0c 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -421,6 +421,10 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     void handleGetTopicOfNamespaceResponse(const 
proto::CommandGetTopicsOfNamespaceResponse&);
     void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
     void handleAckResponse(const proto::CommandAckResponse&);
+    boost::optional<std::string> getAssignedBrokerServiceUrl(
+        const proto::CommandCloseProducer& closeProducer);
+    boost::optional<std::string> getAssignedBrokerServiceUrl(
+        const proto::CommandCloseConsumer& closeConsumer);
 };
 }  // namespace pulsar
 
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 76d4389..63c85b0 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -91,7 +91,9 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const 
ClientConfiguration&
             ClientImpl::getClientVersion(clientConfiguration)),
       producerIdGenerator_(0),
       consumerIdGenerator_(0),
-      closingError(ResultOk) {
+      closingError(ResultOk),
+      useProxy_(false),
+      lookupCount_(0L) {
     std::unique_ptr<LoggerFactory> loggerFactory = 
clientConfiguration_.impl_->takeLogger();
     if (loggerFactory) {
         LogUtils::setLoggerFactory(std::move(loggerFactory));
@@ -532,6 +534,8 @@ Future<Result, ClientConnectionPtr> 
ClientImpl::getConnection(const std::string&
                 promise.setFailed(result);
                 return;
             }
+            useProxy_ = data.proxyThroughServiceUrl;
+            lookupCount_++;
             pool_.getConnectionAsync(data.logicalAddress, 
data.physicalAddress, key)
                 .addListener([promise](Result result, const 
ClientConnectionWeakPtr& weakCnx) {
                     if (result == ResultOk) {
@@ -550,6 +554,33 @@ Future<Result, ClientConnectionPtr> 
ClientImpl::getConnection(const std::string&
     return promise.getFuture();
 }
 
+const std::string& ClientImpl::getPhysicalAddress(const std::string& 
logicalAddress) {
+    if (useProxy_) {
+        return serviceNameResolver_.resolveHost();
+    } else {
+        return logicalAddress;
+    }
+}
+
+Future<Result, ClientConnectionPtr> ClientImpl::connect(const std::string& 
logicalAddress, size_t key) {
+    const auto& physicalAddress = getPhysicalAddress(logicalAddress);
+    Promise<Result, ClientConnectionPtr> promise;
+    pool_.getConnectionAsync(logicalAddress, physicalAddress, key)
+        .addListener([promise](Result result, const ClientConnectionWeakPtr& 
weakCnx) {
+            if (result == ResultOk) {
+                auto cnx = weakCnx.lock();
+                if (cnx) {
+                    promise.setValue(cnx);
+                } else {
+                    promise.setFailed(ResultConnectError);
+                }
+            } else {
+                promise.setFailed(result);
+            }
+        });
+    return promise.getFuture();
+}
+
 void ClientImpl::handleGetPartitions(const Result result, const 
LookupDataResultPtr partitionMetadata,
                                      TopicNamePtr topicName, 
GetPartitionsCallback callback) {
     if (result != ResultOk) {
@@ -635,6 +666,7 @@ void ClientImpl::closeAsync(CloseCallback callback) {
     if (*numberOfOpenHandlers == 0 && callback) {
         handleClose(ResultOk, numberOfOpenHandlers, callback);
     }
+    lookupCount_ = 0;
 }
 
 void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers, 
ResultCallback callback) {
@@ -722,6 +754,7 @@ void ClientImpl::shutdown() {
     
partitionListenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout());
     timeoutProcessor.tok();
     LOG_DEBUG("partitionListenerExecutorProvider_ is closed");
+    lookupCount_ = 0;
 }
 
 uint64_t ClientImpl::newProducerId() {
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 762aa60..a2649a6 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -97,6 +97,8 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
 
     Future<Result, ClientConnectionPtr> getConnection(const std::string& 
topic, size_t key);
 
+    Future<Result, ClientConnectionPtr> connect(const std::string& 
logicalAddress, size_t key);
+
     void closeAsync(CloseCallback callback);
     void shutdown();
 
@@ -124,6 +126,7 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
     std::shared_ptr<std::atomic<uint64_t>> getRequestIdGenerator() const { 
return requestIdGenerator_; }
 
     ConnectionPool& getConnectionPool() noexcept { return pool_; }
+    uint64_t getLookupCount() { return lookupCount_; }
 
     static std::chrono::nanoseconds getOperationTimeout(const 
ClientConfiguration& clientConfiguration);
 
@@ -160,6 +163,8 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
                                           const std::string& consumerName, 
const ConsumerConfiguration& conf,
                                           SubscribeCallback callback);
 
+    const std::string& getPhysicalAddress(const std::string& logicalAddress);
+
     static std::string getClientVersion(const ClientConfiguration& 
clientConfiguration);
 
     enum State
@@ -191,6 +196,8 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
     SynchronizedHashMap<ConsumerImplBase*, ConsumerImplBaseWeakPtr> consumers_;
 
     std::atomic<Result> closingError;
+    std::atomic<bool> useProxy_;
+    std::atomic<uint64_t> lookupCount_;
 
     friend class Client;
 };
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index dbd3b65..86dddb0 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1234,10 +1234,13 @@ void ConsumerImpl::negativeAcknowledge(const MessageId& 
messageId) {
     negativeAcksTracker_->add(messageId);
 }
 
-void ConsumerImpl::disconnectConsumer() {
-    LOG_INFO("Broker notification of Closed consumer: " << consumerId_);
+void ConsumerImpl::disconnectConsumer() { disconnectConsumer(boost::none); }
+
+void ConsumerImpl::disconnectConsumer(const boost::optional<std::string>& 
assignedBrokerUrl) {
+    LOG_INFO("Broker notification of Closed consumer: "
+             << consumerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " + 
assignedBrokerUrl.get()) : ""));
     resetCnx();
-    scheduleReconnection();
+    scheduleReconnection(assignedBrokerUrl);
 }
 
 void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 0f801f6..5612091 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -131,6 +131,7 @@ class ConsumerImpl : public ConsumerImplBase {
     void hasMessageAvailableAsync(HasMessageAvailableCallback callback) 
override;
 
     virtual void disconnectConsumer();
+    virtual void disconnectConsumer(const boost::optional<std::string>& 
assignedBrokerUrl);
     Result fetchSingleMessageFromBroker(Message& msg);
 
     virtual bool isCumulativeAcknowledgementAllowed(ConsumerType consumerType);
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index fa21a57..c5327fc 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -68,7 +68,18 @@ void HandlerBase::setCnx(const ClientConnectionPtr& cnx) {
     connection_ = cnx;
 }
 
-void HandlerBase::grabCnx() {
+void HandlerBase::grabCnx() { grabCnx(boost::none); }
+
+Future<Result, ClientConnectionPtr> HandlerBase::getConnection(
+    const ClientImplPtr& client, const boost::optional<std::string>& 
assignedBrokerUrl) {
+    if (assignedBrokerUrl && client->getLookupCount() > 0) {
+        return client->connect(assignedBrokerUrl.get(), connectionKeySuffix_);
+    } else {
+        return client->getConnection(topic(), connectionKeySuffix_);
+    }
+}
+
+void HandlerBase::grabCnx(const boost::optional<std::string>& 
assignedBrokerUrl) {
     bool expectedState = false;
     if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
         LOG_INFO(getName() << "Ignoring reconnection attempt since there's 
already a pending reconnection");
@@ -90,7 +101,7 @@ void HandlerBase::grabCnx() {
         return;
     }
     auto self = shared_from_this();
-    auto cnxFuture = client->getConnection(topic(), connectionKeySuffix_);
+    auto cnxFuture = getConnection(client, assignedBrokerUrl);
     cnxFuture.addListener([this, self](Result result, const 
ClientConnectionPtr& cnx) {
         if (result == ResultOk) {
             LOG_DEBUG(getName() << "Connected to broker: " << 
cnx->cnxString());
@@ -141,12 +152,12 @@ void HandlerBase::handleDisconnection(Result result, 
const ClientConnectionPtr&
             break;
     }
 }
-
-void HandlerBase::scheduleReconnection() {
+void HandlerBase::scheduleReconnection() { scheduleReconnection(boost::none); }
+void HandlerBase::scheduleReconnection(const boost::optional<std::string>& 
assignedBrokerUrl) {
     const auto state = state_.load();
 
     if (state == Pending || state == Ready) {
-        TimeDuration delay = backoff_.next();
+        TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0) 
: backoff_.next();
 
         LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) 
/ 1000.0) << " s");
         timer_->expires_from_now(delay);
@@ -154,10 +165,10 @@ void HandlerBase::scheduleReconnection() {
         // so we will not run into the case where grabCnx is invoked on out of 
scope handler
         auto name = getName();
         std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
-        timer_->async_wait([name, weakSelf](const ASIO_ERROR& ec) {
+        timer_->async_wait([name, weakSelf, assignedBrokerUrl](const 
ASIO_ERROR& ec) {
             auto self = weakSelf.lock();
             if (self) {
-                self->handleTimeout(ec);
+                self->handleTimeout(ec, assignedBrokerUrl);
             } else {
                 LOG_WARN(name << "Cancel the reconnection since the handler is 
destroyed");
             }
@@ -165,13 +176,13 @@ void HandlerBase::scheduleReconnection() {
     }
 }
 
-void HandlerBase::handleTimeout(const ASIO_ERROR& ec) {
+void HandlerBase::handleTimeout(const ASIO_ERROR& ec, const 
boost::optional<std::string>& assignedBrokerUrl) {
     if (ec) {
         LOG_DEBUG(getName() << "Ignoring timer cancelled event, code[" << ec 
<< "]");
         return;
     } else {
         epoch_++;
-        grabCnx();
+        grabCnx(assignedBrokerUrl);
     }
 }
 
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 68c0b6a..b9d9855 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -20,6 +20,7 @@
 #define _PULSAR_HANDLER_BASE_HEADER_
 #include <pulsar/Result.h>
 
+#include <boost/optional.hpp>
 #include <memory>
 #include <mutex>
 #include <string>
@@ -53,11 +54,22 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
     void resetCnx() { setCnx(nullptr); }
 
    protected:
+    /*
+     * tries reconnection and sets connection_ to valid object
+     * @param assignedBrokerUrl assigned broker url to directly connect to 
without lookup
+     */
+    void grabCnx(const boost::optional<std::string>& assignedBrokerUrl);
+
     /*
      * tries reconnection and sets connection_ to valid object
      */
     void grabCnx();
 
+    /*
+     * Schedule reconnection after backoff time
+     * @param assignedBrokerUrl assigned broker url to directly connect to 
without lookup
+     */
+    void scheduleReconnection(const boost::optional<std::string>& 
assignedBrokerUrl);
     /*
      * Schedule reconnection after backoff time
      */
@@ -89,9 +101,12 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
    private:
     const std::shared_ptr<std::string> topic_;
 
+    Future<Result, ClientConnectionPtr> getConnection(const ClientImplPtr& 
client,
+                                                      const 
boost::optional<std::string>& assignedBrokerUrl);
+
     void handleDisconnection(Result result, const ClientConnectionPtr& cnx);
 
-    void handleTimeout(const ASIO_ERROR& ec);
+    void handleTimeout(const ASIO_ERROR& ec, const 
boost::optional<std::string>& assignedBrokerUrl);
 
    protected:
     ClientImplWeakPtr client_;
diff --git a/lib/LookupService.h b/lib/LookupService.h
index 35f4730..b50d1f8 100644
--- a/lib/LookupService.h
+++ b/lib/LookupService.h
@@ -42,6 +42,7 @@ class LookupService {
     struct LookupResult {
         std::string logicalAddress;
         std::string physicalAddress;
+        bool proxyThroughServiceUrl;
 
         friend std::ostream& operator<<(std::ostream& os, const LookupResult& 
lookupResult) {
             return os << "logical address: " << lookupResult.logicalAddress
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index fc39b23..2e5cd44 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -971,12 +971,15 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata& 
metadata, SharedBuffer
                                encryptedPayload);
 }
 
-void ProducerImpl::disconnectProducer() {
-    LOG_INFO("Broker notification of Closed producer: " << producerId_);
+void ProducerImpl::disconnectProducer(const boost::optional<std::string>& 
assignedBrokerUrl) {
+    LOG_INFO("Broker notification of Closed producer: "
+             << producerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " + 
assignedBrokerUrl.get()) : ""));
     resetCnx();
-    scheduleReconnection();
+    scheduleReconnection(assignedBrokerUrl);
 }
 
+void ProducerImpl::disconnectProducer() { disconnectProducer(boost::none); }
+
 void ProducerImpl::start() {
     HandlerBase::start();
 
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 9781605..f650b1f 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -98,6 +98,7 @@ class ProducerImpl : public HandlerBase, public 
ProducerImplBase {
 
     bool ackReceived(uint64_t sequenceId, MessageId& messageId);
 
+    virtual void disconnectProducer(const boost::optional<std::string>& 
assignedBrokerUrl);
     virtual void disconnectProducer();
 
     uint64_t getProducerId() const;
diff --git a/proto/PulsarApi.proto b/proto/PulsarApi.proto
index 7be6522..a2548f3 100644
--- a/proto/PulsarApi.proto
+++ b/proto/PulsarApi.proto
@@ -623,11 +623,15 @@ message CommandReachedEndOfTopic {
 message CommandCloseProducer {
     required uint64 producer_id = 1;
     required uint64 request_id = 2;
+    optional string assignedBrokerServiceUrl = 3;
+    optional string assignedBrokerServiceUrlTls = 4;
 }
 
 message CommandCloseConsumer {
     required uint64 consumer_id = 1;
     required uint64 request_id = 2;
+    optional string assignedBrokerServiceUrl = 3;
+    optional string assignedBrokerServiceUrlTls = 4;
 }
 
 message CommandRedeliverUnacknowledgedMessages {
diff --git a/run-unit-tests.sh b/run-unit-tests.sh
index 898cfed..226789c 100755
--- a/run-unit-tests.sh
+++ b/run-unit-tests.sh
@@ -30,6 +30,14 @@ fi
 export http_proxy=
 export https_proxy=
 
+
+# Run ExtensibleLoadManager tests
+docker compose -f tests/extensibleLM/docker-compose.yml up -d
+until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
+sleep 5
+$CMAKE_BUILD_DIRECTORY/tests/ExtensibleLoadManagerTest
+docker compose -f tests/extensibleLM/docker-compose.yml down
+
 # Run OAuth2 tests
 docker compose -f tests/oauth2/docker-compose.yml up -d
 # Wait until the namespace is created, currently there is no good way to check 
it
diff --git a/tests/BuildTests.cmake b/tests/BuildTests.cmake
index c468f51..0fe7430 100644
--- a/tests/BuildTests.cmake
+++ b/tests/BuildTests.cmake
@@ -55,3 +55,6 @@ target_link_libraries(Oauth2Test pulsarStatic 
${GTEST_TARGETS})
 
 add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc HttpHelper.cc)
 target_link_libraries(ChunkDedupTest pulsarStatic ${GTEST_TARGETS})
+
+add_executable(ExtensibleLoadManagerTest 
extensibleLM/ExtensibleLoadManagerTest.cc HttpHelper.cc)
+target_link_libraries(ExtensibleLoadManagerTest PRIVATE pulsarStatic 
${GTEST_TARGETS})
diff --git a/tests/LegacyBuildTests.cmake b/tests/LegacyBuildTests.cmake
index 1e5335f..c6f4e96 100644
--- a/tests/LegacyBuildTests.cmake
+++ b/tests/LegacyBuildTests.cmake
@@ -75,3 +75,6 @@ target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic 
${GTEST_LIBRARY_PAT
 add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc HttpHelper.cc)
 target_link_libraries(ChunkDedupTest ${CLIENT_LIBS} pulsarStatic 
${GTEST_LIBRARY_PATH})
 
+add_executable(ExtensibleLoadManagerTest 
extensibleLM/ExtensibleLoadManagerTest.cc HttpHelper.cc)
+target_include_directories(ExtensibleLoadManagerTest PRIVATE 
${AUTOGEN_DIR}/lib)
+target_link_libraries(ExtensibleLoadManagerTest PRIVATE pulsarStatic 
${GTEST_LIBRARY_PATH})
diff --git a/tests/extensibleLM/ExtensibleLoadManagerTest.cc 
b/tests/extensibleLM/ExtensibleLoadManagerTest.cc
new file mode 100644
index 0000000..4472d9d
--- /dev/null
+++ b/tests/extensibleLM/ExtensibleLoadManagerTest.cc
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// Run `docker-compose up -d` to set up the test environment for this test.
+#include <gtest/gtest.h>
+
+#include <thread>
+
+#include "include/pulsar/Client.h"
+#include "lib/LogUtils.h"
+#include "lib/Semaphore.h"
+#include "tests/HttpHelper.h"
+#include "tests/PulsarFriend.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+bool checkTime() {
+    const static auto start = std::chrono::high_resolution_clock::now();
+    auto end = std::chrono::high_resolution_clock::now();
+    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end 
- start).count();
+    return duration < 180 * 1000;
+}
+
+TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
+    const static std::string adminUrl = "http://localhost:8080/";;
+    const static std::string topicName =
+        "persistent://public/unload-test/topic-1" + std::to_string(time(NULL));
+
+    ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
+        std::string url = adminUrl + 
"admin/v2/namespaces/public/unload-test?bundles=1";
+        int res = makePutRequest(url, "");
+        return res == 204 || res == 409;
+    }));
+
+    Client client{"pulsar://localhost:6650"};
+    Producer producer;
+    ProducerConfiguration producerConfiguration;
+    Result producerResult = client.createProducer(topicName, 
producerConfiguration, producer);
+    ASSERT_EQ(producerResult, ResultOk);
+    Consumer consumer;
+    Result consumerResult = client.subscribe(topicName, "sub", consumer);
+    ASSERT_EQ(consumerResult, ResultOk);
+
+    Semaphore firstUnloadSemaphore(0);
+    Semaphore secondUnloadSemaphore(0);
+    Semaphore halfPubWaitSemaphore(0);
+    const int msgCount = 10;
+    int produced = 0;
+    auto produce = [&]() {
+        int i = 0;
+        while (i < msgCount && checkTime()) {
+            if (i == 3) {
+                firstUnloadSemaphore.acquire();
+            }
+
+            if (i == 5) {
+                halfPubWaitSemaphore.release();
+            }
+
+            if (i == 8) {
+                secondUnloadSemaphore.acquire();
+            }
+
+            std::string content = std::to_string(i);
+            const auto msg = MessageBuilder().setContent(content).build();
+
+            ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
+                Result sendResult = producer.send(msg);
+                return sendResult == ResultOk;
+            }));
+
+            LOG_INFO("produced index:" << i);
+            produced++;
+            i++;
+        }
+        LOG_INFO("producer finished");
+    };
+
+    int consumed = 0;
+    auto consume = [&]() {
+        Message receivedMsg;
+        int i = 0;
+        while (i < msgCount && checkTime()) {
+            ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
+                Result receiveResult =
+                    consumer.receive(receivedMsg, 1000);  // Assumed that we 
wait 1000 ms for each message
+                return receiveResult == ResultOk;
+            }));
+            LOG_INFO("received index:" << i);
+
+            int id = std::stoi(receivedMsg.getDataAsString());
+            if (id < i) {
+                continue;
+            }
+            ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
+                Result ackResult = consumer.acknowledge(receivedMsg);
+                return ackResult == ResultOk;
+            }));
+            LOG_INFO("acked index:" << i);
+
+            consumed++;
+            i++;
+        }
+        LOG_INFO("consumer finished");
+    };
+
+    std::thread produceThread(produce);
+    std::thread consumeThread(consume);
+
+    auto unload = [&] {
+        auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+        auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+        auto &producerImpl = PulsarFriend::getProducerImpl(producer);
+        uint64_t lookupCountBeforeUnload;
+        std::string destinationBroker;
+        while (checkTime()) {
+            // make sure producers and consumers are ready
+            ASSERT_TRUE(waitUntil(std::chrono::seconds(30),
+                                  [&] { return consumerImpl.isConnected() && 
producerImpl.isConnected(); }));
+
+            std::string url = adminUrl + 
"lookup/v2/topic/persistent/public/unload-test/topic-1";
+            std::string responseDataBeforeUnload;
+            int res = makeGetRequest(url, responseDataBeforeUnload);
+            if (res != 200) {
+                continue;
+            }
+            destinationBroker = responseDataBeforeUnload.find("broker-2") == 
std::string::npos
+                                    ? "broker-2:8080"
+                                    : "broker-1:8080";
+            lookupCountBeforeUnload = clientImplPtr->getLookupCount();
+            ASSERT_TRUE(lookupCountBeforeUnload > 0);
+
+            url = adminUrl +
+                  
"admin/v2/namespaces/public/unload-test/0x00000000_0xffffffff/unload?destinationBroker="
 +
+                  destinationBroker;
+            LOG_INFO("before lookup responseData:" << responseDataBeforeUnload 
<< ",unload url:" << url
+                                                   << 
",lookupCountBeforeUnload:" << lookupCountBeforeUnload);
+            res = makePutRequest(url, "");
+            LOG_INFO("unload res:" << res);
+            if (res != 204) {
+                continue;
+            }
+
+            // make sure producers and consumers are ready
+            ASSERT_TRUE(waitUntil(std::chrono::seconds(30),
+                                  [&] { return consumerImpl.isConnected() && 
producerImpl.isConnected(); }));
+            std::string responseDataAfterUnload;
+            ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
+                url = adminUrl + 
"lookup/v2/topic/persistent/public/unload-test/topic-1";
+                res = makeGetRequest(url, responseDataAfterUnload);
+                return res == 200 && 
responseDataAfterUnload.find(destinationBroker) != std::string::npos;
+            }));
+            LOG_INFO("after lookup responseData:" << responseDataAfterUnload 
<< ",res:" << res);
+
+            // TODO: check lookup counter after pip-307 is released
+            auto lookupCountAfterUnload = clientImplPtr->getLookupCount();
+            ASSERT_TRUE(lookupCountBeforeUnload < lookupCountAfterUnload);
+            break;
+        }
+    };
+    LOG_INFO("starting first unload");
+    unload();
+    firstUnloadSemaphore.release();
+    halfPubWaitSemaphore.acquire();
+    LOG_INFO("starting second unload");
+    unload();
+    secondUnloadSemaphore.release();
+
+    produceThread.join();
+    consumeThread.join();
+    ASSERT_EQ(consumed, msgCount);
+    ASSERT_EQ(produced, msgCount);
+    ASSERT_TRUE(checkTime()) << "timed out";
+    client.close();
+}
+
+int main(int argc, char *argv[]) {
+    ::testing::InitGoogleTest(&argc, argv);
+
+    return RUN_ALL_TESTS();
+}
diff --git a/tests/extensibleLM/docker-compose.yml 
b/tests/extensibleLM/docker-compose.yml
new file mode 100644
index 0000000..8d3c33a
--- /dev/null
+++ b/tests/extensibleLM/docker-compose.yml
@@ -0,0 +1,150 @@
+version: '3'
+networks:
+  pulsar:
+    driver: bridge
+services:
+  # Start ZooKeeper
+  zookeeper:
+    image: apachepulsar/pulsar:latest
+    container_name: zookeeper
+    restart: on-failure
+    networks:
+      - pulsar
+    environment:
+      - metadataStoreUrl=zk:zookeeper:2181
+      - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+    command: >
+      bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \
+             bin/generate-zookeeper-config.sh conf/zookeeper.conf && \
+             exec bin/pulsar zookeeper"
+    healthcheck:
+      test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"]
+      interval: 10s
+      timeout: 5s
+      retries: 30
+
+  # Initialize cluster metadata
+  pulsar-init:
+    container_name: pulsar-init
+    hostname: pulsar-init
+    image: apachepulsar/pulsar:latest
+    networks:
+      - pulsar
+    environment:
+      - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+    command: >
+      bin/pulsar initialize-cluster-metadata \
+               --cluster cluster-a \
+               --zookeeper zookeeper:2181 \
+               --configuration-store zookeeper:2181 \
+               --web-service-url http://broker-1:8080 \
+               --broker-service-url pulsar://broker-1:6650
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+
+  # Start bookie
+  bookie:
+    image: apachepulsar/pulsar:latest
+    container_name: bookie
+    restart: on-failure
+    networks:
+      - pulsar
+    environment:
+      - clusterName=cluster-a
+      - zkServers=zookeeper:2181
+      - metadataServiceUri=metadata-store:zk:zookeeper:2181
+      - advertisedAddress=bookie
+      - BOOKIE_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+      pulsar-init:
+        condition: service_completed_successfully
+    command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && 
exec bin/pulsar bookie"
+
+  proxy:
+    image: apachepulsar/pulsar:latest
+    container_name: proxy
+    hostname: proxy
+    restart: on-failure
+    networks:
+      - pulsar
+    environment:
+      - metadataStoreUrl=zk:zookeeper:2181
+      - zookeeperServers=zookeeper:2181
+      - clusterName=cluster-a
+      - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+    ports:
+      - "8080:8080"
+      - "6650:6650"
+    depends_on:
+      broker-1:
+        condition: service_started
+      broker-2:
+        condition: service_started
+    command: bash -c "bin/apply-config-from-env.py conf/proxy.conf && exec 
bin/pulsar proxy"
+
+  # Start broker 1
+  broker-1:
+    image: apachepulsar/pulsar:latest
+    container_name: broker-1
+    hostname: broker-1
+    restart: on-failure
+    networks:
+      - pulsar
+    environment:
+      - metadataStoreUrl=zk:zookeeper:2181
+      - zookeeperServers=zookeeper:2181
+      - clusterName=cluster-a
+      - managedLedgerDefaultEnsembleSize=1
+      - managedLedgerDefaultWriteQuorum=1
+      - managedLedgerDefaultAckQuorum=1
+      - advertisedAddress=broker-1
+      - internalListenerName=internal
+      - advertisedListeners=internal:pulsar://broker-1:6650
+      - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
+      # Load Manager. Here uses the extensible load balancer, sets the 
unloading strategy to TransferShedder, and enables debug mode.
+      - 
loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
+      - 
loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
+      - loadBalancerSheddingEnabled=false
+      - loadBalancerDebugModeEnabled=true
+      - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+      bookie:
+        condition: service_started
+    command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec 
bin/pulsar broker"
+
+  # Start broker 2
+  broker-2:
+    image: apachepulsar/pulsar:latest
+    container_name: broker-2
+    hostname: broker-2
+    restart: on-failure
+    networks:
+      - pulsar
+    environment:
+      - metadataStoreUrl=zk:zookeeper:2181
+      - zookeeperServers=zookeeper:2181
+      - clusterName=cluster-a
+      - managedLedgerDefaultEnsembleSize=1
+      - managedLedgerDefaultWriteQuorum=1
+      - managedLedgerDefaultAckQuorum=1
+      - advertisedAddress=broker-2
+      - internalListenerName=internal
+      - advertisedListeners=internal:pulsar://broker-2:6650
+      - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
+      # Load Manager. Here uses the extensible load balancer, sets the 
unloading strategy to TransferShedder, and enables debug mode.
+      - 
loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
+      - 
loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
+      - loadBalancerSheddingEnabled=false
+      - loadBalancerDebugModeEnabled=true
+      - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+      bookie:
+        condition: service_started
+    command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec 
bin/pulsar broker"
\ No newline at end of file


Reply via email to