This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ae6d1f  [feat] Support ExclusiveWithFencing producer access mode. 
(#201)
3ae6d1f is described below

commit 3ae6d1f68bb596f838ae549ca2f09473122d9e44
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Mar 21 11:10:57 2023 +0800

    [feat] Support ExclusiveWithFencing producer access mode. (#201)
    
    * [feat] Support ExclusiveWithFencing producer access mode.
    
    * Optimaze unit test.
    
    * Fix unit test.
---
 include/pulsar/ProducerConfiguration.h |  8 ++++-
 tests/ProducerTest.cc                  | 65 ++++++++++++++++++++++++++++++++++
 2 files changed, 72 insertions(+), 1 deletion(-)

diff --git a/include/pulsar/ProducerConfiguration.h 
b/include/pulsar/ProducerConfiguration.h
index e0c824c..62a6380 100644
--- a/include/pulsar/ProducerConfiguration.h
+++ b/include/pulsar/ProducerConfiguration.h
@@ -95,7 +95,13 @@ class PULSAR_PUBLIC ProducerConfiguration {
         /**
          * Producer creation is pending until it can acquire exclusive access.
          */
-        WaitForExclusive = 2
+        WaitForExclusive = 2,
+
+        /**
+         * Acquire exclusive access for the producer. Any existing producer 
will be removed and
+         * invalidated immediately.
+         */
+        ExclusiveWithFencing = 3
     };
 
     ProducerConfiguration();
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 7c99771..0a52563 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -334,6 +334,71 @@ TEST(ProducerTest, testWaitForExclusiveProducer) {
     producer2.close();
 }
 
+TEST(ProducerTest, testExclusiveWithFencingProducer) {
+    Client client(serviceUrl);
+
+    std::string topicName =
+        "persistent://public/default/testExclusiveWithFencingProducer" + 
std::to_string(time(nullptr));
+
+    Producer producer1;
+    ProducerConfiguration producerConfiguration1;
+    producerConfiguration1.setProducerName("p-name-1");
+    producerConfiguration1.setAccessMode(ProducerConfiguration::Exclusive);
+
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, 
producerConfiguration1, producer1));
+    producer1.send(MessageBuilder().setContent("content").build());
+
+    Producer producer2;
+    ProducerConfiguration producerConfiguration2;
+    producerConfiguration2.setProducerName("p-name-2");
+    
producerConfiguration2.setAccessMode(ProducerConfiguration::WaitForExclusive);
+
+    Latch latch(1);
+    client.createProducerAsync(topicName, producerConfiguration2,
+                               [&latch, &producer2](Result res, Producer 
producer) {
+                                   // producer2 will be fenced
+                                   ASSERT_EQ(ResultProducerFenced, res);
+                                   latch.countdown();
+                                   producer2 = producer;
+                               });
+    // wait for all the Producers to be enqueued in order to prevent races
+    sleep(1);
+
+    // producer3 will create success.
+    Producer producer3;
+    ProducerConfiguration producerConfiguration3;
+    producerConfiguration3.setProducerName("p-name-3");
+    
producerConfiguration3.setAccessMode(ProducerConfiguration::ExclusiveWithFencing);
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, 
producerConfiguration3, producer3));
+    ASSERT_EQ(ResultOk, 
producer3.send(MessageBuilder().setContent("content").build()));
+
+    latch.wait();
+    // producer1 will be fenced
+    ASSERT_EQ(ResultProducerFenced, 
producer1.send(MessageBuilder().setContent("content").build()));
+
+    // Again create producer4 with WaitForExclusive
+    Producer producer4;
+    ProducerConfiguration producerConfiguration4;
+    producerConfiguration2.setProducerName("p-name-4");
+    
producerConfiguration2.setAccessMode(ProducerConfiguration::WaitForExclusive);
+    Latch latch2(1);
+    client.createProducerAsync(topicName, producerConfiguration2,
+                               [&latch2, &producer4](Result res, Producer 
producer) {
+                                   // producer4 will be success
+                                   ASSERT_EQ(ResultOk, res);
+                                   producer4 = producer;
+                                   latch2.countdown();
+                               });
+    ASSERT_EQ(ResultProducerNotInitialized, 
producer4.send(MessageBuilder().setContent("content").build()));
+
+    // When producer3 is close, producer4 will be create success
+    producer3.close();
+    latch2.wait();
+    ASSERT_EQ(ResultOk, 
producer4.send(MessageBuilder().setContent("content").build()));
+
+    client.close();
+}
+
 TEST_P(ProducerTest, testFlushNoBatch) {
     Client client(serviceUrl);
 

Reply via email to