This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new 12a03ce feat: support producer access mode. (#331)
12a03ce is described below
commit 12a03ce7fc1b26ba68f75aaf586d73aae9f9fbf9
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jun 15 21:41:55 2023 +0800
feat: support producer access mode. (#331)
---
index.d.ts | 7 ++++++
src/ProducerConfig.cc | 14 ++++++++++++
tests/producer.test.js | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 83 insertions(+)
diff --git a/index.d.ts b/index.d.ts
index b394804..161dabc 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -63,6 +63,7 @@ export interface ProducerConfig {
cryptoFailureAction?: ProducerCryptoFailureAction;
chunkingEnabled?: boolean;
schema?: SchemaInfo;
+ accessMode?: ProducerAccessMode;
}
export class Producer {
@@ -267,3 +268,9 @@ export type SchemaType =
'Bytes' |
'AutoConsume' |
'AutoPublish';
+
+export type ProducerAccessMode =
+ 'Shared' |
+ 'Exclusive' |
+ 'WaitForExclusive' |
+ 'ExclusiveWithFencing';
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 52d7707..120eebf 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -39,6 +39,7 @@ static const std::string CFG_PUBLIC_KEY_PATH =
"publicKeyPath";
static const std::string CFG_ENCRYPTION_KEY = "encryptionKey";
static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
+static const std::string CFG_ACCESS_MODE = "accessMode";
static const std::map<std::string, pulsar_partitions_routing_mode>
MESSAGE_ROUTING_MODE = {
{"UseSinglePartition", pulsar_UseSinglePartition},
@@ -63,6 +64,13 @@ static std::map<std::string,
pulsar_producer_crypto_failure_action> PRODUCER_CRY
{"SEND", pulsar_ProducerSend},
};
+static std::map<std::string, pulsar_producer_access_mode> PRODUCER_ACCESS_MODE
= {
+ {"Shared", pulsar_ProducerAccessModeShared},
+ {"Exclusive", pulsar_ProducerAccessModeExclusive},
+ {"WaitForExclusive", pulsar_ProducerAccessModeWaitForExclusive},
+ {"ExclusiveWithFencing", pulsar_ProducerAccessModeExclusiveWithFencing},
+};
+
ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("")
{
this->cProducerConfig = std::shared_ptr<pulsar_producer_configuration_t>(
pulsar_producer_configuration_create(),
pulsar_producer_configuration_free);
@@ -194,6 +202,12 @@ ProducerConfig::ProducerConfig(const Napi::Object&
producerConfig) : topic("") {
bool chunkingEnabled =
producerConfig.Get(CFG_CHUNK_ENABLED).ToBoolean().Value();
pulsar_producer_configuration_set_chunking_enabled(this->cProducerConfig.get(),
chunkingEnabled);
}
+
+ std::string accessMode =
producerConfig.Get(CFG_ACCESS_MODE).ToString().Utf8Value();
+ if (PRODUCER_ACCESS_MODE.count(accessMode)) {
+ pulsar_producer_configuration_set_access_mode(this->cProducerConfig.get(),
+
PRODUCER_ACCESS_MODE.at(accessMode));
+ }
}
ProducerConfig::~ProducerConfig() {}
diff --git a/tests/producer.test.js b/tests/producer.test.js
index fa7710b..fbce743 100644
--- a/tests/producer.test.js
+++ b/tests/producer.test.js
@@ -94,5 +94,67 @@ const Pulsar = require('../index.js');
await producer.close();
});
});
+ describe('Access Mode', () => {
+ test('Exclusive', async () => {
+ const topicName = 'test-access-mode-exclusive';
+ const producer1 = await client.createProducer({
+ topic: topicName,
+ producerName: 'p-1',
+ accessMode: 'Exclusive',
+ });
+ expect(producer1.getProducerName()).toBe('p-1');
+
+ await expect(client.createProducer({
+ topic: topicName,
+ producerName: 'p-2',
+ accessMode: 'Exclusive',
+ })).rejects.toThrow('Failed to create producer: ResultProducerFenced');
+
+ await producer1.close();
+ });
+
+ test('WaitForExclusive', async () => {
+ const topicName = 'test-access-mode-wait-for-exclusive';
+ const producer1 = await client.createProducer({
+ topic: topicName,
+ producerName: 'p-1',
+ accessMode: 'Exclusive',
+ });
+ expect(producer1.getProducerName()).toBe('p-1');
+ // async close producer1
+ producer1.close();
+ // when p1 close, p2 success created.
+ const producer2 = await client.createProducer({
+ topic: topicName,
+ producerName: 'p-2',
+ accessMode: 'WaitForExclusive',
+ });
+ expect(producer2.getProducerName()).toBe('p-2');
+ await producer2.close();
+ });
+
+ test('ExclusiveWithFencing', async () => {
+ const topicName = 'test-access-mode';
+ const producer1 = await client.createProducer({
+ topic: topicName,
+ producerName: 'p-1',
+ accessMode: 'Exclusive',
+ });
+ expect(producer1.getProducerName()).toBe('p-1');
+ const producer2 = await client.createProducer({
+ topic: topicName,
+ producerName: 'p-2',
+ accessMode: 'ExclusiveWithFencing',
+ });
+ expect(producer2.getProducerName()).toBe('p-2');
+ // producer1 will be fenced.
+ await expect(
+ producer1.send({
+ data: Buffer.from('test-msg'),
+ }),
+ ).rejects.toThrow('Failed to send message: ResultProducerFenced');
+ await producer2.close();
+ });
+ });
});
})();