This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e1f8fd  [feat] End-to-end encryption support for Reader (#326)
2e1f8fd is described below

commit 2e1f8fdfd80be999bac8a606b458d9eefa6d17ab
Author: Robert Barbey <[email protected]>
AuthorDate: Fri May 19 14:26:09 2023 +0200

    [feat] End-to-end encryption support for Reader (#326)
    
    * feat: add handling of private key and crypto failure action
    
    * feat: expose fields for private key and crypto failure action
    
    * test: add tests for encrypted reader config
    
    * doc: add encryption reader example
---
 examples/encryption-reader.js | 44 +++++++++++++++++++++++++++++++++++++++++++
 index.d.ts                    |  2 ++
 src/ReaderConfig.cc           | 24 +++++++++++++++++++++++
 tests/end_to_end.test.js      | 41 ++++++++++++++++++++++++++++++++++++++++
 tstest.ts                     | 15 +++++++++++++++
 5 files changed, 126 insertions(+)

diff --git a/examples/encryption-reader.js b/examples/encryption-reader.js
new file mode 100644
index 0000000..a324cda
--- /dev/null
+++ b/examples/encryption-reader.js
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+const Pulsar = require('../');
+
+(async () => {
+  // Create a client
+  const client = new Pulsar.Client({
+    serviceUrl: 'pulsar://localhost:6650',
+    operationTimeoutSeconds: 30
+  });
+
+  // Create a reader
+  const reader = await client.createReader({
+    topic: 'persistent://public/default/my-topic',
+    startMessageId: Pulsar.MessageId.earliest(),
+    privateKeyPath: './certificate/private-key.client-rsa.pem'
+  });
+
+  // Receive messages
+  for (let i = 0; i < 10; i += 1) {
+    const msg = await reader.readNext();
+    console.log(msg.getData().toString());
+  }
+
+  await reader.close();
+  await client.close();
+})();
diff --git a/index.d.ts b/index.d.ts
index ca1e489..97462dd 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -119,6 +119,8 @@ export interface ReaderConfig {
   subscriptionRolePrefix?: string;
   readCompacted?: boolean;
   listener?: (message: Message, reader: Reader) => void;
+  privateKeyPath?: string;
+  cryptoFailureAction?: ConsumerCryptoFailureAction;
 }
 
 export class Reader {
diff --git a/src/ReaderConfig.cc b/src/ReaderConfig.cc
index 2513c0c..1e56063 100644
--- a/src/ReaderConfig.cc
+++ b/src/ReaderConfig.cc
@@ -19,6 +19,7 @@
 
 #include "ReaderConfig.h"
 #include "MessageId.h"
+#include <pulsar/c/consumer_configuration.h>
 #include <map>
 
 static const std::string CFG_TOPIC = "topic";
@@ -28,6 +29,14 @@ static const std::string CFG_READER_NAME = "readerName";
 static const std::string CFG_SUBSCRIPTION_ROLE_PREFIX = 
"subscriptionRolePrefix";
 static const std::string CFG_READ_COMPACTED = "readCompacted";
 static const std::string CFG_LISTENER = "listener";
+static const std::string CFG_PRIVATE_KEY_PATH = "privateKeyPath";
+static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
+
+static const std::map<std::string, pulsar_consumer_crypto_failure_action> 
CONSUMER_CRYPTO_FAILURE_ACTION = {
+    {"FAIL", pulsar_ConsumerFail},
+    {"DISCARD", pulsar_ConsumerDiscard},
+    {"CONSUME", pulsar_ConsumerConsume},
+};
 
 void FinalizeListenerCallback(Napi::Env env, ReaderListenerCallback *cb, void 
*) { delete cb; }
 
@@ -82,6 +91,21 @@ ReaderConfig::ReaderConfig(const Napi::Object &readerConfig, 
pulsar_reader_liste
     pulsar_reader_configuration_set_reader_listener(this->cReaderConfig.get(), 
readerListener,
                                                     this->listener);
   }
+
+  if (readerConfig.Has(CFG_PRIVATE_KEY_PATH) && 
readerConfig.Get(CFG_PRIVATE_KEY_PATH).IsString()) {
+    std::string publicKeyPath = "";
+    std::string privateKeyPath = 
readerConfig.Get(CFG_PRIVATE_KEY_PATH).ToString().Utf8Value();
+    
pulsar_reader_configuration_set_default_crypto_key_reader(this->cReaderConfig.get(),
+                                                              
publicKeyPath.c_str(), privateKeyPath.c_str());
+    if (readerConfig.Has(CFG_CRYPTO_FAILURE_ACTION) &&
+        readerConfig.Get(CFG_CRYPTO_FAILURE_ACTION).IsString()) {
+      std::string cryptoFailureAction = 
readerConfig.Get(CFG_CRYPTO_FAILURE_ACTION).ToString().Utf8Value();
+      if (CONSUMER_CRYPTO_FAILURE_ACTION.count(cryptoFailureAction)) {
+        pulsar_reader_configuration_set_crypto_failure_action(
+            this->cReaderConfig.get(), 
CONSUMER_CRYPTO_FAILURE_ACTION.at(cryptoFailureAction));
+      }
+    }
+  }
 }
 
 ReaderConfig::~ReaderConfig() {
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index dc382cf..1c33b2d 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -989,6 +989,47 @@ const Pulsar = require('../index.js');
       await consumer.close();
       await client.close();
     });
+    test('Basic produce and read encryption', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'persistent://public/default/encryption-produce-read';
+      const producer = await client.createProducer({
+        topic,
+        sendTimeoutMs: 30000,
+        batchingEnabled: true,
+        publicKeyPath: `${__dirname}/certificate/public-key.client-rsa.pem`,
+        encryptionKey: 'encryption-key',
+      });
+
+      const reader = await client.createReader({
+        topic,
+        startMessageId: Pulsar.MessageId.earliest(),
+        privateKeyPath: `${__dirname}/certificate/private-key.client-rsa.pem`,
+      });
+
+      const messages = [];
+      for (let i = 0; i < 10; i += 1) {
+        const msg = `my-message-${i}`;
+        producer.send({
+          data: Buffer.from(msg),
+        });
+        messages.push(msg);
+      }
+      await producer.flush();
+
+      const results = [];
+      for (let i = 0; i < 10; i += 1) {
+        const msg = await reader.readNext();
+        results.push(msg.getData().toString());
+      }
+      expect(lodash.difference(messages, results)).toEqual([]);
+      await producer.close();
+      await reader.close();
+      await client.close();
+    });
     test('Produce/Consume/Read/IsConnected', async () => {
       const client = new Pulsar.Client({
         serviceUrl: 'pulsar://localhost:6650',
diff --git a/tstest.ts b/tstest.ts
index bb19744..568e0b0 100644
--- a/tstest.ts
+++ b/tstest.ts
@@ -223,6 +223,19 @@ import Pulsar = require('./index');
     },
   });
 
+  const reader4: Pulsar.Reader = await client.createReader({
+    topic: 'persistent://public/default/my-topic',
+    startMessageId: Pulsar.MessageId.earliest(),
+    privateKeyPath: '/path/to/private.key',
+  });
+
+  const reader5: Pulsar.Reader = await client.createReader({
+    topic: 'persistent://public/default/my-topic',
+    startMessageId: Pulsar.MessageId.earliest(),
+    privateKeyPath: '/path/to/private.key',
+    cryptoFailureAction: 'CONSUME',
+  });
+
   const producerName: string = producer1.getProducerName();
   const topicName1: string = producer1.getTopic();
   const producerIsConnected: boolean = producer1.isConnected();
@@ -289,6 +302,8 @@ import Pulsar = require('./index');
   await reader1.close();
   await reader2.close();
   await reader3.close();
+  await reader4.close();
+  await reader5.close();
   await client.close();
 })();
 

Reply via email to