This is an automated email from the ASF dual-hosted git repository.
nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new 544e809 Support subscriptionInitialPosition setting (#66)
544e809 is described below
commit 544e809b2642cc689f35f3d52bedf2d5ab7a64c8
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Mon Feb 3 10:03:33 2020 +0900
Support subscriptionInitialPosition setting (#66)
---
src/ConsumerConfig.cc | 12 +++++++++
tests/end_to_end.test.js | 69 ++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 81 insertions(+)
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index 7ef2b79..b687b98 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -27,6 +27,7 @@
static const std::string CFG_TOPIC = "topic";
static const std::string CFG_SUBSCRIPTION = "subscription";
static const std::string CFG_SUBSCRIPTION_TYPE = "subscriptionType";
+static const std::string CFG_INIT_POSITION = "subscriptionInitialPosition";
static const std::string CFG_ACK_TIMEOUT = "ackTimeoutMs";
static const std::string CFG_NACK_REDELIVER_TIMEOUT = "nAckRedeliverTimeoutMs";
static const std::string CFG_RECV_QUEUE = "receiverQueueSize";
@@ -40,6 +41,9 @@ static const std::map<std::string, pulsar_consumer_type>
SUBSCRIPTION_TYPE = {
{"Shared", pulsar_ConsumerShared},
{"Failover", pulsar_ConsumerFailover}};
+static const std::map<std::string, initial_position> INIT_POSITION = {
+ {"Latest", initial_position_latest}, {"Earliest",
initial_position_earliest}};
+
struct MessageListenerProxyData {
std::shared_ptr<CConsumerWrapper> consumerWrapper;
pulsar_message_t *cMessage;
@@ -91,6 +95,14 @@ ConsumerConfig::ConsumerConfig(const Napi::Object
&consumerConfig,
}
}
+ if (consumerConfig.Has(CFG_INIT_POSITION) &&
consumerConfig.Get(CFG_INIT_POSITION).IsString()) {
+ std::string initPosition =
consumerConfig.Get(CFG_INIT_POSITION).ToString().Utf8Value();
+ if (INIT_POSITION.count(initPosition)) {
+ pulsar_consumer_set_subscription_initial_position(this->cConsumerConfig,
+
INIT_POSITION.at(initPosition));
+ }
+ }
+
if (consumerConfig.Has(CFG_CONSUMER_NAME) &&
consumerConfig.Get(CFG_CONSUMER_NAME).IsString()) {
std::string consumerName =
consumerConfig.Get(CFG_CONSUMER_NAME).ToString().Utf8Value();
if (!consumerName.empty())
pulsar_consumer_set_consumer_name(this->cConsumerConfig, consumerName.c_str());
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 7ef3fac..a0fad68 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -215,6 +215,75 @@ const Pulsar = require('../index.js');
await client.close();
});
+ test('subscriptionInitialPosition', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
+ const topic = 'persistent://public/default/subscriptionInitialPosition';
+ const producer = await client.createProducer({
+ topic,
+ sendTimeoutMs: 30000,
+ batchingEnabled: false,
+ });
+ expect(producer).not.toBeNull();
+
+ const messages = [];
+ for (let i = 0; i < 2; i += 1) {
+ const msg = `my-message-${i}`;
+ producer.send({
+ data: Buffer.from(msg),
+ });
+ messages.push(msg);
+ }
+ await producer.flush();
+
+ const latestConsumer = await client.subscribe({
+ topic,
+ subscription: 'latestSub',
+ subscriptionInitialPosition: 'Latest',
+ });
+ expect(latestConsumer).not.toBeNull();
+
+ const earliestConsumer = await client.subscribe({
+ topic,
+ subscription: 'earliestSub',
+ subscriptionInitialPosition: 'Earliest',
+ });
+ expect(earliestConsumer).not.toBeNull();
+
+ for (let i = 2; i < 4; i += 1) {
+ const msg = `my-message-${i}`;
+ producer.send({
+ data: Buffer.from(msg),
+ });
+ messages.push(msg);
+ }
+ await producer.flush();
+
+ const latestResults = [];
+ const earliestResults = [];
+ for (let i = 0; i < 4; i += 1) {
+ if (i < 2) {
+ const latestMsg = await latestConsumer.receive(5000);
+ latestConsumer.acknowledge(latestMsg);
+ latestResults.push(latestMsg.getData().toString());
+ }
+
+ const earliestMsg = await earliestConsumer.receive(5000);
+ earliestConsumer.acknowledge(earliestMsg);
+ earliestResults.push(earliestMsg.getData().toString());
+ }
+ expect(lodash.difference(messages,
latestResults)).toEqual(['my-message-0', 'my-message-1']);
+ expect(lodash.difference(messages, earliestResults)).toEqual([]);
+
+ await producer.close();
+ await latestConsumer.close();
+ await earliestConsumer.close();
+ await client.close();
+ });
+
test('Produce/Read', async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',