This is an automated email from the ASF dual-hosted git repository.

nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 544e809  Support subscriptionInitialPosition setting (#66)
544e809 is described below

commit 544e809b2642cc689f35f3d52bedf2d5ab7a64c8
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Mon Feb 3 10:03:33 2020 +0900

    Support subscriptionInitialPosition setting (#66)
---
 src/ConsumerConfig.cc    | 12 +++++++++
 tests/end_to_end.test.js | 69 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 81 insertions(+)

diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index 7ef2b79..b687b98 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -27,6 +27,7 @@
 static const std::string CFG_TOPIC = "topic";
 static const std::string CFG_SUBSCRIPTION = "subscription";
 static const std::string CFG_SUBSCRIPTION_TYPE = "subscriptionType";
+static const std::string CFG_INIT_POSITION = "subscriptionInitialPosition";
 static const std::string CFG_ACK_TIMEOUT = "ackTimeoutMs";
 static const std::string CFG_NACK_REDELIVER_TIMEOUT = "nAckRedeliverTimeoutMs";
 static const std::string CFG_RECV_QUEUE = "receiverQueueSize";
@@ -40,6 +41,9 @@ static const std::map<std::string, pulsar_consumer_type> 
SUBSCRIPTION_TYPE = {
     {"Shared", pulsar_ConsumerShared},
     {"Failover", pulsar_ConsumerFailover}};
 
+static const std::map<std::string, initial_position> INIT_POSITION = {
+    {"Latest", initial_position_latest}, {"Earliest", 
initial_position_earliest}};
+
 struct MessageListenerProxyData {
   std::shared_ptr<CConsumerWrapper> consumerWrapper;
   pulsar_message_t *cMessage;
@@ -91,6 +95,14 @@ ConsumerConfig::ConsumerConfig(const Napi::Object 
&consumerConfig,
     }
   }
 
+  if (consumerConfig.Has(CFG_INIT_POSITION) && 
consumerConfig.Get(CFG_INIT_POSITION).IsString()) {
+    std::string initPosition = 
consumerConfig.Get(CFG_INIT_POSITION).ToString().Utf8Value();
+    if (INIT_POSITION.count(initPosition)) {
+      pulsar_consumer_set_subscription_initial_position(this->cConsumerConfig,
+                                                        
INIT_POSITION.at(initPosition));
+    }
+  }
+
   if (consumerConfig.Has(CFG_CONSUMER_NAME) && 
consumerConfig.Get(CFG_CONSUMER_NAME).IsString()) {
     std::string consumerName = 
consumerConfig.Get(CFG_CONSUMER_NAME).ToString().Utf8Value();
     if (!consumerName.empty()) 
pulsar_consumer_set_consumer_name(this->cConsumerConfig, consumerName.c_str());
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 7ef3fac..a0fad68 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -215,6 +215,75 @@ const Pulsar = require('../index.js');
       await client.close();
     });
 
+    test('subscriptionInitialPosition', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'persistent://public/default/subscriptionInitialPosition';
+      const producer = await client.createProducer({
+        topic,
+        sendTimeoutMs: 30000,
+        batchingEnabled: false,
+      });
+      expect(producer).not.toBeNull();
+
+      const messages = [];
+      for (let i = 0; i < 2; i += 1) {
+        const msg = `my-message-${i}`;
+        producer.send({
+          data: Buffer.from(msg),
+        });
+        messages.push(msg);
+      }
+      await producer.flush();
+
+      const latestConsumer = await client.subscribe({
+        topic,
+        subscription: 'latestSub',
+        subscriptionInitialPosition: 'Latest',
+      });
+      expect(latestConsumer).not.toBeNull();
+
+      const earliestConsumer = await client.subscribe({
+        topic,
+        subscription: 'earliestSub',
+        subscriptionInitialPosition: 'Earliest',
+      });
+      expect(earliestConsumer).not.toBeNull();
+
+      for (let i = 2; i < 4; i += 1) {
+        const msg = `my-message-${i}`;
+        producer.send({
+          data: Buffer.from(msg),
+        });
+        messages.push(msg);
+      }
+      await producer.flush();
+
+      const latestResults = [];
+      const earliestResults = [];
+      for (let i = 0; i < 4; i += 1) {
+        if (i < 2) {
+          const latestMsg = await latestConsumer.receive(5000);
+          latestConsumer.acknowledge(latestMsg);
+          latestResults.push(latestMsg.getData().toString());
+        }
+
+        const earliestMsg = await earliestConsumer.receive(5000);
+        earliestConsumer.acknowledge(earliestMsg);
+        earliestResults.push(earliestMsg.getData().toString());
+      }
+      expect(lodash.difference(messages, 
latestResults)).toEqual(['my-message-0', 'my-message-1']);
+      expect(lodash.difference(messages, earliestResults)).toEqual([]);
+
+      await producer.close();
+      await latestConsumer.close();
+      await earliestConsumer.close();
+      await client.close();
+    });
+
     test('Produce/Read', async () => {
       const client = new Pulsar.Client({
         serviceUrl: 'pulsar://localhost:6650',

Reply via email to