This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 3ae6d1f [feat] Support ExclusiveWithFencing producer access mode.
(#201)
3ae6d1f is described below
commit 3ae6d1f68bb596f838ae549ca2f09473122d9e44
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Mar 21 11:10:57 2023 +0800
[feat] Support ExclusiveWithFencing producer access mode. (#201)
* [feat] Support ExclusiveWithFencing producer access mode.
* Optimaze unit test.
* Fix unit test.
---
include/pulsar/ProducerConfiguration.h | 8 ++++-
tests/ProducerTest.cc | 65 ++++++++++++++++++++++++++++++++++
2 files changed, 72 insertions(+), 1 deletion(-)
diff --git a/include/pulsar/ProducerConfiguration.h
b/include/pulsar/ProducerConfiguration.h
index e0c824c..62a6380 100644
--- a/include/pulsar/ProducerConfiguration.h
+++ b/include/pulsar/ProducerConfiguration.h
@@ -95,7 +95,13 @@ class PULSAR_PUBLIC ProducerConfiguration {
/**
* Producer creation is pending until it can acquire exclusive access.
*/
- WaitForExclusive = 2
+ WaitForExclusive = 2,
+
+ /**
+ * Acquire exclusive access for the producer. Any existing producer
will be removed and
+ * invalidated immediately.
+ */
+ ExclusiveWithFencing = 3
};
ProducerConfiguration();
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 7c99771..0a52563 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -334,6 +334,71 @@ TEST(ProducerTest, testWaitForExclusiveProducer) {
producer2.close();
}
+TEST(ProducerTest, testExclusiveWithFencingProducer) {
+ Client client(serviceUrl);
+
+ std::string topicName =
+ "persistent://public/default/testExclusiveWithFencingProducer" +
std::to_string(time(nullptr));
+
+ Producer producer1;
+ ProducerConfiguration producerConfiguration1;
+ producerConfiguration1.setProducerName("p-name-1");
+ producerConfiguration1.setAccessMode(ProducerConfiguration::Exclusive);
+
+ ASSERT_EQ(ResultOk, client.createProducer(topicName,
producerConfiguration1, producer1));
+ producer1.send(MessageBuilder().setContent("content").build());
+
+ Producer producer2;
+ ProducerConfiguration producerConfiguration2;
+ producerConfiguration2.setProducerName("p-name-2");
+
producerConfiguration2.setAccessMode(ProducerConfiguration::WaitForExclusive);
+
+ Latch latch(1);
+ client.createProducerAsync(topicName, producerConfiguration2,
+ [&latch, &producer2](Result res, Producer
producer) {
+ // producer2 will be fenced
+ ASSERT_EQ(ResultProducerFenced, res);
+ latch.countdown();
+ producer2 = producer;
+ });
+ // wait for all the Producers to be enqueued in order to prevent races
+ sleep(1);
+
+ // producer3 will create success.
+ Producer producer3;
+ ProducerConfiguration producerConfiguration3;
+ producerConfiguration3.setProducerName("p-name-3");
+
producerConfiguration3.setAccessMode(ProducerConfiguration::ExclusiveWithFencing);
+ ASSERT_EQ(ResultOk, client.createProducer(topicName,
producerConfiguration3, producer3));
+ ASSERT_EQ(ResultOk,
producer3.send(MessageBuilder().setContent("content").build()));
+
+ latch.wait();
+ // producer1 will be fenced
+ ASSERT_EQ(ResultProducerFenced,
producer1.send(MessageBuilder().setContent("content").build()));
+
+ // Again create producer4 with WaitForExclusive
+ Producer producer4;
+ ProducerConfiguration producerConfiguration4;
+ producerConfiguration2.setProducerName("p-name-4");
+
producerConfiguration2.setAccessMode(ProducerConfiguration::WaitForExclusive);
+ Latch latch2(1);
+ client.createProducerAsync(topicName, producerConfiguration2,
+ [&latch2, &producer4](Result res, Producer
producer) {
+ // producer4 will be success
+ ASSERT_EQ(ResultOk, res);
+ producer4 = producer;
+ latch2.countdown();
+ });
+ ASSERT_EQ(ResultProducerNotInitialized,
producer4.send(MessageBuilder().setContent("content").build()));
+
+ // When producer3 is close, producer4 will be create success
+ producer3.close();
+ latch2.wait();
+ ASSERT_EQ(ResultOk,
producer4.send(MessageBuilder().setContent("content").build()));
+
+ client.close();
+}
+
TEST_P(ProducerTest, testFlushNoBatch) {
Client client(serviceUrl);